# Import Modules

In [33]:
from pyspark.sql import Row, functions as F
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import *
from datetime import *
import time
import pytz
import pyodbc
import pandas as pd

# Set Configurations

In [34]:
# Configuration needs to be set to workaround issue of inserting dates before 1900
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

# Define Functions

In [35]:
# Function to load source file to dataframe
def load_source_to_df(path, source, file_format=True):

    if source == '<source_system>':
        if file_format == True: # If file_format left empty, it will automatically try to retrieve the format from the files
            files = mssparkutils.fs.ls(path)
            if len(files) > 1:
                file_format = files[1].name.split('.')[-1]
            else:
                file_format = files[0].name.split('.')[-1]                       
    print("File format is " + file_format)
    df = spark.read.load(path=path,format=file_format)
    return df


In [36]:
def get_primarykeys(df_meta, table):

    print("Extracting primary keys.....")
    # Filter DD03L on Table & Primary Keys
    df_meta_filtered = df_meta \
                        .where((col("TABNAME") == table) & (col("KEYFLAG") == "X") & (col("DATATYPE") != " ")) \
                        .orderBy(col("POSITION"))

    # Put Primary Keys in list
    primary_keys = df_meta_filtered \
                    .select("FIELDNAME") \
                    .rdd.flatMap(lambda x: x) \
                    .collect()

    # CODE IS NEEDED TO REMOVE KEYS WHICH ARE NOT PROVIDED BY SAP -> MANDT etc
    print("Primary keys successfully extracted.")
    return primary_keys
    

In [37]:
# Create SCD2 columns
def add_scd2(df, ts, pks):

    print("Implementing SCD2.....")
    columns_to_hash = df.columns
    columns_to_hash.sort()

    pks_to_hash = pks # Primary Keys are already sorted in metadata

    df = df \
        .withColumn('hash', F.sha2(F.concat_ws("", *columns_to_hash), 256)) \
        .withColumn('pk_hash', F.sha2(F.concat_ws("", *pks_to_hash), 256))

    # Set start_date to current datetime & end_date to default end date
    start_date = ts.strftime('%Y-%m-%d.%H:%M:%S')
    end_date = None

    df = (df
        .withColumn('Record_Start_Date', F.to_timestamp(F.lit(start_date), 'yyyy-MM-dd.HH:mm:ss'))
        .withColumn('Record_End_Date', F.to_timestamp(F.lit(end_date), 'yyyy-MM-dd.HH:mm:ss'))
        .withColumn('IsActive_Record', F.lit('Y'))
    )
    print("SCD2 successfully implemented.")
    return df
    

In [38]:
# Function to check whether the delta table exists
def file_exists(path):
  try:
    mssparkutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

In [39]:
# Function to delete folders and files recursively
def remove_folder(path):
  if file_exists(path):
    mssparkutils.fs.rm(path, True)
  else:
    raise Exception("Folder doesn't exist")

In [40]:
# Set dtypes
def set_dtypes(df, df_metadata, table_name):

    # Filter on table
    df_metadata_table = df_metadata.filter(F.col('TABNAME')==table_name)

    # Convert column dtypes in df based on df_metadata
    df = df.select(*(F.col(c).cast(df_metadata_table.filter(F.col('FIELDNAME')==c).select('dtype').collect()[0][0])
            .alias(c) if df_metadata_table.filter(F.col('FIELDNAME')==c).select('dtype').collect() else F.col(c).cast('string')
            .alias(c) for c in df.columns))

    return df

In [41]:
# Function to do a full load
def write_full_load(df, destination_path, table_name, ts):

    print("Extraction type is FULL.....")

    if file_exists(destination_path): # Upsert into existing delta table

        # Set end date
        end_date = ts +  timedelta(seconds=-1)

        # Load existing delta table to df (Only the active records)
        deltaTable = DeltaTable.forPath(spark, destination_path)
        deltatable_df = deltaTable.toDF().where("IsActive_Record = 'Y'")

        # Find the records that are new or changed
        df_toBeAdded = df.alias("toBeAdded") \
            .join(deltatable_df.alias("deltaTable"), on="hash", how="left_outer") \
            .where("deltaTable.hash IS NULL") \
            .selectExpr("toBeAdded.*")

        # Find the records that have to be closed (due to deletion or update)
        df_toBeDeleted = (df.alias("toBeDeleted") 
            .join(deltatable_df.alias("deltaTable"), on="hash", how="right_outer") 
            .where("toBeDeleted.hash IS NULL")  # Meaning: these hashes exist in the current delta table (will be matched in the merge later on)
            .selectExpr("deltaTable.*"))

        # Union the new, changed and to be closed records
        df_stagedUpdates = (
            df_toBeAdded
                .selectExpr("toBeAdded.*")
                .union(df_toBeDeleted.selectExpr("*"))
        )

        # Merge the stagedUpdates with the deltaTable.
        # When records match on the hash the record is not in the new file anymore and has to be closed.
        # current is set to false and valid_to is set to the current date.
        # This closes this record.
        # When there is no match, insert all columns of those record.
        # This inserts changed records as new record with current = true and valid_from is null.
        deltaTable.alias("deltaTable") \
            .merge(df_stagedUpdates.alias("stagedUpdates"), # Merge the toBeDeleted records based on hash
                    "deltaTable.hash = stagedUpdates.hash") \
            .whenMatchedUpdate(
            set={
                "IsActive_Record": F.lit('N'),
                "Record_End_Date": F.lit(end_date) #F.to_timestamp(F.lit(end_date), 'yyyy-MM-dd.HH:mm:ss')
            }
        ) \
            .whenNotMatchedInsertAll() \
            .execute()

    else: # Create new table
        df.write.mode("overwrite").format("delta").save(destination_path)

    print("Extraction completed.")

In [42]:
# Function to do a delta load
def write_delta_load(df, destination_path, table_name, ts):

    print("Extraction type is DELTA.....")
    if file_exists(destination_path): # Upsert into existing delta table

        # Set end date
        end_date = ts +  timedelta(seconds=-1)

        # Load existing delta table to df (Only the active records)
        deltaTable = DeltaTable.forPath(spark, destination_path)
        deltatable_df = deltaTable.toDF().where("IsActive_Record = 'Y'")

        # df (new delta load) should consist of only new and modified records 
        # Re-order columns so pk_hash is in front which will prevent issues when unioning & inserting the data
        df_toBeAdded = df.alias("toBeAdded") \
            .join(deltatable_df.alias("deltaTable"), on="pk_hash", how="left_outer") \
            .where("(deltaTable.pk_hash IS NULL OR toBeAdded.hash <> deltaTable.hash)") \
            .selectExpr("toBeAdded.*")

        #columns_to_front = ["pk_hash"]
        #original = df_toBeAdded.columns
        #original.remove(columns_to_front[0])
        #df_toBeAdded = df_toBeAdded.select(*columns_to_front, *original)

        # To be deleted needs to be matched based on pk_hash
        df_toBeDeleted = df.alias("toBeDeleted") \
            .join(deltatable_df.alias("deltaTable"), on="pk_hash", how="right_outer") \
            .where("toBeDeleted.pk_hash IS NOT NULL AND (toBeDeleted.hash <> deltaTable.hash)") \
            .selectExpr("deltaTable.*")

        # New set only contains new and modified records. Modified records have to be linked to existing records so that they can be closed. Modified records have PKs that already exist in the current delta table. Meaning, when comparing the
        # two sets (delta and existing table), rows with overlapping records (= same PKs) are records that have to be closed in the existing table.

        # Union the new, changed and to be closed records
        df_stagedUpdates = (
            df_toBeAdded
                .selectExpr("toBeAdded.*")
                .union(df_toBeDeleted.selectExpr("*"))
        )

        # Merge the stagedUpdates with the deltaTable.
        # When records match on the hash the record is not in the new file anymore and has to be closed.
        # current is set to false and valid_to is set to the current date.
        # This closes this record.
        # When there is no match, insert all columns of those record.
        # This inserts changed records as new record with current = true and valid_from is null.
        deltaTable.alias("deltaTable") \
            .merge(df_stagedUpdates.alias("stagedUpdates"), # Merge the toBeDeleted records based on pk_hash & hash -> also on hash because those are the records which did not change and need to be closed
                    "deltaTable.pk_hash = stagedUpdates.pk_hash AND deltaTable.hash = stagedUpdates.hash") \
            .whenMatchedUpdate(
            set={
                "IsActive_Record": F.lit('N'),
                "Record_End_Date": F.lit(end_date)
            }
        ) \
            .whenNotMatchedInsertAll() \
            .execute()

    else: 
        # Create new table
        df.write.mode("overwrite").format("delta").save(destination_path)
    print("Extraction completed.")

# Set Variables

In [43]:
# Environment to be passed as parameter
env = 'td'

In [44]:
if env == 'td':
    storageaccount = 'storageaccounttest'
else:
    storageaccount = 'storageaccountprod'

# Source
source = '<source_system>'
source_container = 'raw'
source_folder = f"abfss://{source_container}@{storageaccount}.dfs.core.windows.net/{source}/data/"

# Destination
destination_container = 'enriched'
destination_folder = f"abfss://{destination_container}@{storageaccount}.dfs.core.windows.net/{source}/data/"

# Archive
archive_container = 'backup'
archive_folder = f"abfss://{archive_container}@{storageaccount}.dfs.core.windows.net/{source}/processed/"

# Metadata path - SAP Table DD03L
meta_path = f"abfss://{source_container}@{storageaccount}.dfs.core.windows.net/{source}/data/dd03l"

# Keyl vault
key_vault_name = 'KV-'+env.upper()+'DATAHUB'

# Metadata DB
meta_server = 'tcp:sql-'+env+'-weu-GDWH-datahub.database.windows.net'
meta_database = 'SQLDB-METADATASTOREDB'
# meta_password_secret_name = 'sqlserveradmin'

# Set timezone
timezone = pytz.timezone('Europe/Amsterdam')

# Run Process

In [45]:
# Connect to metadata DB
ls_key_vault_name = 'ls_AzureKeyVault'
meta_username = 'sqlserveradmin'
meta_password = mssparkutils.credentials.getSecret(key_vault_name,meta_username,ls_key_vault_name) 
driver = '{ODBC Driver 17 for SQL Server}'
cnxn = pyodbc.connect('DRIVER='+driver+';SERVER='+meta_server+';PORT=1433;DATABASE='+meta_database+';UID='+meta_username+';PWD='+ meta_password)

# Read metadata for tables using:
# 1) SAP Table connector

tables = pd.read_sql(
    '''SELECT 
	[TABLE_ID] AS [ID]
	,[TABLE_NAME]
	,[SOURCE_SYSTEM]
	,[EXTRACTION_TYPE]
	,[IS_ACTIVE]
	,'SAP TABLE' AS [CONNECTOR_TYPE]
FROM [MDL].[EXTRACTION_SAPTABLE]
WHERE [IS_ACTIVE] = '1'
AND TABLE_NAME in ('EKET','EKKO','EKPO','EKES')
	'''
# AND TABLE_ID IN (3,4,5,6) --Only for testing
    ,cnxn
)

## Load active tables

In [48]:
# Set start Timestamp
ts_start = datetime.now(tz = timezone)

# Load DD03L to df
# Logging start
print(f"-------------------------")
print(f"Metadata table: DDL03 from source system <source_system> is started at {ts_start}")
df_meta = load_source_to_df(meta_path, '<source_system>')
# Set end Timestamp
ts_end = datetime.now(tz = timezone)
print(f"Metadata table: DDL03 is finished at {ts_end}")

# Loop over tables and write them (full or delta load) as (1) delta tables with (2) SCD2 to enhanced folder
for i, row in tables.iterrows():
    table = row.TABLE_NAME
    table_lower_case = row.TABLE_NAME.lower()
    source_system = row.SOURCE_SYSTEM
    extraction_type = row.EXTRACTION_TYPE

    # Reset start Timestamp
    ts_start = datetime.now(tz = timezone)

    # Logging start
    print(f"-------------------------")
    print(f"Source table: {table} from source system {source_system} is started at {ts_start}")

    source_path = f"{source_folder}/{table_lower_case}/"
    archive_path = f"{archive_folder}/{table_lower_case}/"
    destination_path = f"{destination_folder}/{table_lower_case}/"

    print("Loading source table to dataframe.....")
    df = load_source_to_df(source_path, '<source_system>')
    print("Loading successfully completed.")

    primary_keys = get_primarykeys(df_meta, table)

    df = add_scd2(df, ts_start, primary_keys)

    if extraction_type == "FULL":
        write_full_load(df, destination_path, table, ts_start)

    else:
        write_delta_load(df, destination_path, table, ts_start) # extraction_type = DELTA or DELTATOKEN
        

    print(f"Moving files from the source to backup.....")
    staging_files = mssparkutils.fs.ls(source_path)
    for file in staging_files:
        mssparkutils.fs.mkdirs(archive_path)
        mssparkutils.fs.mv(file.path, archive_path, False) #Note: I've tried to use this command to create the directory but it did not work. hence the reason of adding the above command. 
    print(f"Files successully moved.")

    # Reset end Timestamp
    ts_end = datetime.now(tz = timezone)

    # Logging end
    print(f"Source table: {table} is finished at {ts_end}")