In [0]:
data_source = dbutils.widgets.get("data_source")
print("Picking Data from: ", data_source)
source_format = dbutils.widgets.get("source_format")
print("Format of file is:", source_format)
table_name = dbutils.widgets.get("table_name")
print("Loading data into: ", table_name)
checkpoint_path = dbutils.widgets.get("checkpoint_path")
print("Checkpoint Location:", checkpoint_path)

In [0]:
clientId = dbutils.secrets.get("sandbox", "clientId")
tenantId = dbutils.secrets.get("sandbox", "tenantId")
clientSecret = dbutils.secrets.get("sandbox", "clientSecret")

In [0]:
def autoload_csv (data_source, source_format, table_name, checkpoint_path):
    query = (spark.readStream
                  .format('cloudFiles')
                  .option('cloudFiles.format', source_format)
                  .option('inferSchema', 'True')
                  .option('cloudFiles.schemaLocation', checkpoint_path)
                  .option('cloudFiles.resourceGroup', 'sandbox-data-platform')
                  .option("cloudFiles.subscriptionId", 'f6c70282-f97e-45b0-aec5-80d03b9db76b')
                  .option("cloudFiles.tenantId", tenantId)
                  .option("cloudFiles.clientId", clientId)
                  .option("cloudFiles.clientSecret", clientSecret)
                  .option("cloudFiles.useNotifications", "true")
                  .load(data_source)
                  .writeStream
                  .trigger(once=True)
                  .format("delta")
                  .queryName("Fetch data from Landing to Curated")
                  .option('checkpointLocation', checkpoint_path)
                  .option('mergeSchema', "true")
                  .table(table_name)
            )

    return query

In [0]:
query = autoload_csv (data_source = data_source,
                           source_format = source_format,
                           table_name = table_name,
                           checkpoint_path=checkpoint_path
                          )

In [0]:
%sql 

select count(*) from customers; 
-- select * from customers where CustomerKey = '29485'

count(1)
18148


In [0]:
from delta.tables import * 

def upsertToDelta (microBatchOutputDF, batchId): 
    deltaTable = DeltaTable.forName(spark, "customers")
    ( deltaTable.alias("original")
     .merge(
         microBatchOutputDF.alias("updates"),
         "original.CustomerKey = updates.CustomerKey" )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute() )

In [0]:
def autoload_upsert_csv (data_source, source_format, table_name, checkpoint_path):
    query = (spark.readStream
                  .format('cloudFiles')
                  .option('cloudFiles.format', source_format)
                  .option('inferSchema', 'True')
                  .option('cloudFiles.schemaLocation', checkpoint_path)
                  .option('cloudFiles.resourceGroup', 'sandbox-data-platform')
                  .option("cloudFiles.subscriptionId", 'f6c70282-f97e-45b0-aec5-80d03b9db76b')
                  .option("cloudFiles.tenantId", tenantId)
                  .option("cloudFiles.clientId", clientId)
                  .option("cloudFiles.clientSecret", clientSecret)
                  .option("cloudFiles.useNotifications", "true")
                  .option("cloudFiles.useNotifications", "true")
                  .load(data_source)
                  .writeStream
                  .format("delta")
                  .outputMode("append")
                  .foreachBatch(upsertToDelta)
                  .queryName("Merge New Data")
                  .option("checkpointLocation", "/tmp/sky/customers/_checkpoints")
                  .start()
            )

    return query

In [0]:
query = autoload_upsert_csv (data_source = data_source,
                           source_format = source_format,
                           table_name = table_name,
                           checkpoint_path=checkpoint_path
                          )

In [0]:
%sql 

select * from customers  VERSION AS OF 1 where CustomerKey = '11000' 


CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner,_rescued_data
11000,MR.,JON,YANG,4/8/1966,M,M,jon24@adventure-works.com,"$90,000",2,Bachelors,Professional,Y,


In [0]:
%sql 

drop table customers 