# Summary

- Notebook used to load Parquet data to Delta table

### Note: 
- Steps in this notebook can be create as a function, so it can be utilized by other notebooks or part of this code.

### ToDo:
-    Add Partion
-    Add LoadType Condition
-    Set Delta table log retention
-    mergeSchema = true
-    CDC
-    Z-Order
-    Optimize
-    Performance Test
-    non-ascii and special characters
-    error handling
-    add support to multiple unique constraint columns

# Widgets

In [None]:
#dbutils.widgets.removeAll()
dbutils.widgets.text("sourceParquetPath", "","")
dbutils.widgets.text("loadType", "","")
dbutils.widgets.text("targetDB", "","")
dbutils.widgets.text("targetTable", "","")
dbutils.widgets.text("targetDeltaPath", "","")
dbutils.widgets.text("uniqueConstraintColumn", "","")
dbutils.widgets.text("storageAccount", "","")
dbutils.widgets.text("storageContainer", "","")
dbutils.widgets.text("timestamp", "","")
dbutils.widgets.text("azureDirectoryId", "","")
dbutils.widgets.text("databricksAppID", ","")
dbutils.widgets.text("databricksAppSecret", "","")
dbutils.widgets.text("databricksScope", "","")

#Get Parameters

In [None]:
sourceParquetPath = getArgument("sourceParquetPath")
loadType = getArgument("loadType")
targetDB = getArgument("targetDB")
targetTable = getArgument("targetTable") 
targetDeltaPath = getArgument("targetDeltaPath")
uniqueConstraintColumn = getArgument("uniqueConstraintColumn")
storageAccount = getArgument("storageAccount")
storageContainer = getArgument("storageContainer")
sourceParquetStorageURI = "abfss://" + storageContainer + "@" + storageAccount + ".dfs.core.windows.net/" + sourceParquetPath
targetDeltaStorageURI = "abfss://" + storageContainer + "@" + storageAccount + ".dfs.core.windows.net/" + targetDeltaPath + "/" + targetTable
targetStageDeltaStorageURI = "abfss://" + storageContainer + "@" + storageAccount + ".dfs.core.windows.net/stage_delta/" + targetTable
timestamp = getArgument("timestamp")
azureDirectoryId = getArgument("azureDirectoryId")
databricksAppID = getArgument("databricksAppID")
databricksAppSecret = getArgument("databricksAppSecret")
databricksScope = getArgument("databricksScope")

# Config

In [None]:
spark.conf.set("fs.azure.account.auth.type." + storageAccount + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccount + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccount + ".dfs.core.windows.net", dbutils.secrets.get(scope=databricksScope,key=databricksAppID))
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccount + ".dfs.core.windows.net", dbutils.secrets.get(scope=databricksScope,key=databricksAppSecret))
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccount + ".dfs.core.windows.net", "https://login.microsoftonline.com/"+azureDirectoryId+"/oauth2/token")

#dbutils.fs.ls("abfss://lake@" + storageAccount + ".dfs.core.windows.net")

#Libraries

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

# Validations

In [None]:
# Check if Folder & File Exist
# Check if all parameters have value

# Process Parquet to Delta

In [None]:
#Create database only if database with same name doesn't exist.
spark.sql(f"CREATE DATABASE IF NOT EXISTS {targetDB}")

#Set Database
spark.sql(f"USE {targetDB}")

#Load Data Frame
dfParquetData = spark.read.format('parquet').load(sourceParquetStorageURI)
dfParquetData = dfParquetData.withColumn("processed_time_stamp", to_timestamp(lit(timestamp)))

#display(dfParquetData.limit(10))

#Check if Delta file exist
deltaExist = DeltaTable.isDeltaTable(spark, targetDeltaStorageURI)

if deltaExist == False :
    # Write Data Frame as Delta (Target)
    dfParquetData.write \
      .format('Delta') \
      .mode('overwrite') \
      .save(targetDeltaStorageURI)
    
    #Create Table if doesnt exist
    spark.sql(f"CREATE TABLE IF NOT EXISTS {targetTable} USING DELTA LOCATION '{targetDeltaStorageURI}'")    

else:
    # Write Data Frame as Delta (Stage)    
    dfParquetData.createOrReplaceTempView("stgTemp_"+targetTable)
    
    #Optimize
    spark.sql(f"OPTIMIZE {targetDB}.{targetTable} ZORDER BY ({uniqueConstraintColumn})")

    mergeIntoStatement = f"MERGE INTO {targetDB}.{targetTable} AS Target USING stgTemp_{targetTable} AS Source ON Target.{uniqueConstraintColumn}=Source.{uniqueConstraintColumn}"
    mergeIntoStatement += " WHEN MATCHED THEN UPDATE SET *"
    mergeIntoStatement += " WHEN NOT MATCHED THEN INSERT *"
    spark.sql(mergeIntoStatement)
