# Autoloader with upsert

* Execute Auto Loader code to incrementally ingest data from a location in databrics (to stimulate cloudstorage)
* Describe what happens when a new file arrives in a directory configured for Auto Loader
* Upsert row when a new file arrives

In [0]:
dbutils.fs.rm('/mnt/dataset',True)
dbutils.fs.rm('/mnt/reqd',True)
dbutils.fs.rm('/mnt/dbfslandingzone',True)

Out[312]: True

# Create fixtures

In [0]:
%sql
drop table policy

In [0]:
%sql

create table policy (
policy string,
name string,
created date,
amount double) USING DELTA
Location '/mnt/dataset';




insert into policy values
('p0001','ram',cast('1908-03-15' AS DATE),232);


insert into policy values
('p0002','ravi',cast('1991-03-15' AS DATE),222.32);


insert into policy values
('p0001','hrithik',cast('1999-03-15' AS DATE),13442);


insert into policy values
('p0004','rajiv',cast('1991-03-15' AS DATE),222.32);

num_affected_rows,num_inserted_rows
1,1


In [0]:
df=spark.sql("select * from policy")
display(df)

policy,name,created,amount
p0001,hrithik,1999-03-15,13442.0
p0004,rajiv,1991-03-15,222.32
p0002,ravi,1991-03-15,222.32
p0001,ram,1908-03-15,232.0


### Helper function to create json files from delta table
* Create json file from delta table

In [0]:
location='/mnt/reqd'
def createDataFile(query,fname):
    df=spark.sql(query)
    loc=f"{location}/{fname}"
    df.write.format('json').mode('overwrite').option("header", "true").save(loc)

filename='policy1.json'
createDataFile("select * from policy where name in ('hrithik','rajiv')",filename)

In [0]:
%fs
ls /mnt/reqd

path,name,size,modificationTime
dbfs:/mnt/reqd/policy1.json/,policy1.json/,0,0


In [0]:
display(spark.read.json("dbfs:/mnt/reqd/policy1.json"))

amount,created,name,policy
13442.0,1999-03-15,hrithik,p0001
222.32,1991-03-15,rajiv,p0004


### Read JSON files

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
loc='/mnt/reqd'
dbfslanding='/mnt/dbfslandingzone'
schema = StructType([ 
    StructField("policy",StringType(),False), 
    StructField("name",StringType(),False), 
    StructField("created",StringType(),False), 
    StructField("amount", StringType(), False)
  ])

raw_df = spark.readStream.format('cloudFiles') \
.schema(schema) \
.option('cloudFiles.format','json') \
.option("header",True) \
.option('cloudFiles.schemaEvolutionMode','none') \
.option('cloudFiles.schemaLocation','/mnt/reqd/chk') \
.load(loc) #raw is the name of the source directory. Ensure to change it to the name of the container you have created as the source in the storage account

from pyspark.sql.functions import input_file_name, current_timestamp

transformed_df = (raw_df.select(
    "*",
    input_file_name().alias("source_file"),
    current_timestamp().alias("processing_time")
    )
                 )

In [0]:

transformed_df.writeStream \
    .trigger(availableNow=True) \
    .option("checkpointLocation", '/mnt/reqd/chk') \
    .option("path",dbfslanding) \
    .start()

Out[319]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f8fca849df0>

In [0]:
spark.read.option("format","delta") \
.load(dbfslanding) \
.display()

policy,name,created,amount,source_file,processing_time
p0001,hrithik,1999-03-15,13442.0,/mnt/reqd/policy1.json/part-00000-tid-2056987125725837894-f6d84d2a-d0d6-46f1-9c9e-19fa435d1128-26511-1-c000.json,2023-04-21T13:22:38.516+0000
p0004,rajiv,1991-03-15,222.32,/mnt/reqd/policy1.json/part-00001-tid-2056987125725837894-f6d84d2a-d0d6-46f1-9c9e-19fa435d1128-26512-1-c000.json,2023-04-21T13:22:38.516+0000


### Land new file

In [0]:
createDataFile("select * from policy where name='ram'",'policy2.json')

In [0]:
%fs
ls /mnt/reqd

path,name,size,modificationTime
dbfs:/mnt/reqd/chk/,chk/,0,0
dbfs:/mnt/reqd/policy1.json/,policy1.json/,0,0
dbfs:/mnt/reqd/policy2.json/,policy2.json/,0,0


In [0]:
olddf = DeltaTable.forPath(spark, dbfslanding)

display(olddf.toDF())

policy,name,created,amount,source_file,processing_time
p0001,hrithik,1999-03-15,13442.0,/mnt/reqd/policy1.json/part-00000-tid-2056987125725837894-f6d84d2a-d0d6-46f1-9c9e-19fa435d1128-26511-1-c000.json,2023-04-21T13:22:38.516+0000
p0004,rajiv,1991-03-15,222.32,/mnt/reqd/policy1.json/part-00001-tid-2056987125725837894-f6d84d2a-d0d6-46f1-9c9e-19fa435d1128-26512-1-c000.json,2023-04-21T13:22:38.516+0000


### Upsert

In [0]:
from delta.tables import *
olddf= DeltaTable.forPath(spark, dbfslanding)

def updateDeltaTable(batchDF, batchID):
    olddf.alias('t') \
    .merge(batchDF.alias('s'),'t.policy=s.policy') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
    
    


transformed_df.writeStream \
    .foreachBatch(updateDeltaTable) \
    .outputMode('append') \
    .trigger(availableNow=True) \
    .option("checkpointLocation", '/mnt/reqd/chk') \
    .option("path",dbfslanding) \
    .start()


Out[324]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f8fca5bdca0>

In [0]:
spark.read.option("format","delta") \
.load(dbfslanding) \
.dropna() \
.display()

policy,name,created,amount,source_file,processing_time
p0004,rajiv,1991-03-15,222.32,/mnt/reqd/policy1.json/part-00001-tid-2056987125725837894-f6d84d2a-d0d6-46f1-9c9e-19fa435d1128-26512-1-c000.json,2023-04-21T13:22:38.516+0000
p0001,ram,1908-03-15,232.0,/mnt/reqd/policy2.json/part-00000-tid-3734145666152764326-cc9c207e-4f4a-43a8-98c7-20bac493e6f7-26536-1-c000.json,2023-04-21T13:23:41.728+0000


### Conclusion
* Upsert seems to have worked row with policy number p0001 has changed from hritihik to ram