// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

In [None]:
from pyspark.sql import functions as f
from delta.tables import *
import re
import json
import datetime
import time

In [None]:
raw_folderpath = "raw/.../"
raw_filename = ""
primary_key_cols = '["column1","column2", "etc"]'
partition_cols = '["CalcYear","column2", "etc"]'
date_partition_column = 'DATECOLUMNNAME'
file_type = 'json'

In [None]:
# convert parameter partition_cols from string type to list type
partition_cols_list = json.loads(partition_cols.replace("'",'"'))

### Set to Correct Database

In [1]:
%%sql
CREATE DATABASE IF NOT EXISTS staging;
USE staging;

StatementMeta(, 0, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

### Dynamically Get Storage Account Name From Linked Service

In [None]:
storageLinkedService = 'LS_DataLake'
storageAccount_ls = mssparkutils.credentials.getPropertiesAll(storageLinkedService)
storageAccountName = json.loads(storageAccount_ls)['Endpoint'].split('.')[0].replace('https://','')

### Read Input Data

In [None]:
raw_folderpathWithoutContainer = raw_folderpath[4:]
if file_type == 'json':
    raw_data = spark.read.json(f'abfss://raw@{storageAccountName}.dfs.core.windows.net/{raw_folderpathWithoutContainer}/{raw_filename}')
else:
    raw_data = spark.read.load(f'abfss://raw@{storageAccountName}.dfs.core.windows.net/{raw_folderpathWithoutContainer}/{raw_filename}')

### Add Date Partition Columns If Necessary

In [None]:
if 'CalcYear' in partition_cols_list:
    raw_data = raw_data.withColumn('CalcYear', f.year(f.col(date_partition_column)))

if 'CalcMonth' in partition_cols_list:
    raw_data = raw_data.withColumn('CalcMonth', f.month(f.col(date_partition_column)))

if 'CalcDayOfMonth' in partition_cols_list:
    raw_data = raw_data.withColumn('CalcDayOfMonth', f.dayofmonth(f.col(date_partition_column)))

### Set Output Path

In [None]:
stagingFolderPath = re.sub(r'(.*/v[0-9]+/).*', r'\1', raw_folderpathWithoutContainer)
version = re.search(r'\/(v\d+)\/',stagingFolderPath).group(1)
stagingFolderPathNoVersion = stagingFolderPath.replace(f'/{version}/','/')
stagingAbfssPath = f'abfss://staging@{storageAccountName}.dfs.core.windows.net/{stagingFolderPathNoVersion}'

### Determine if Delta Table Exists

In [None]:
tableName = '_'.join(stagingFolderPathNoVersion.split('/')[:-1])

tableExists = spark.catalog.tableExists(tableName)

### Add Filename, Timestamp, Version Columns

In [None]:
# version
raw_data = raw_data.withColumn('control_file_version',f.lit(version))

# location in raw zone
location_in_raw = raw_folderpath + raw_filename
raw_data = raw_data.withColumn('location_in_raw', f.lit(location_in_raw))

# timestamp
now = str(datetime.datetime.now())
raw_data = raw_data.withColumn('timestamp',f.to_timestamp(f.lit(now)))

### If table does **not** exist, create new delta table
- Either with or without partitions

In [None]:
if tableExists == False:
    if partition_cols == "[]":
        #output with no partition specified
        raw_data.write.format("delta") \
                .option("path", stagingAbfssPath) \
                .saveAsTable(tableName)
    else:
        # output with specified partition
        raw_data.write.format("delta") \
                .partitionBy(partition_cols_list) \
                .option("path", stagingAbfssPath) \
                .saveAsTable(tableName)

### Format primary key columns for merge statement

In [None]:
# convert parameter primary_key_cols from string type to list type
primary_key_cols_list = json.loads(primary_key_cols.replace("'",'"'))

# set initial mergeOn statement using first primary key columns
mergeOn = f'current.{primary_key_cols_list[0]} = new.{primary_key_cols_list[0]}'

# Add additional primary key columns to string with preceeding AND
for primary_key_col in primary_key_cols_list[1:]:
    mergeOn = mergeOn + f' AND current.{primary_key_col} = new.{primary_key_col}'

### If Table Exists Merge New Data Into Existing Delta Table

In [None]:
spark.conf.set("spark.microsoft.delta.schema.autoMerge.enabled", "true")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")

if tableExists == True:
    deltaTablePointer = DeltaTable.forPath(spark, stagingAbfssPath)

    # Drop Null Struct/Array Columns
    array_struct_cols = [column[0] for column in raw_data.dtypes if (('struct' in column[1]) | ('array' in column[1]))]
    null_cols = [column for column in array_struct_cols if raw_data.select(f.col(column).isNull().cast("int").alias(column)).agg({column: "sum"}).collect()[0][0] == 0]
    raw_data = raw_data.drop(*null_cols)

    # try merge 3 times 
    try:
        deltaTablePointer.alias("current").merge(
            raw_data.alias("new"), f"{mergeOn}" ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
    except:
        # wait 30 seconds
        time.sleep(30)
        try:
            deltaTablePointer.alias("current").merge(
            raw_data.alias("new"), f"{mergeOn}" ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
        except:
            # wait 30 seconds
            time.sleep(30)
            deltaTablePointer.alias("current").merge(
            raw_data.alias("new"), f"{mergeOn}" ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

### Exit Notebook with Staging Folderpath

In [None]:
mssparkutils.notebook.exit(stagingFolderPathNoVersion)