# INCUBYTE ASSIGNMENT

The following solution is being implemented using PySpark and SQL in databricks. The idea is to stream any new changes to the sink. readstream has a path to a folder that picks up any new file that has been uploaded in the folder and directly streams it into the sink.

## GETTING NECESSARY IMPORTS

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import datetime
from delta.tables import *

## DEFINING SCHEMA OF THE SOURCE CSV

In [0]:
#STRUCTFIELD HAS THREE PARAMETERS I.E. COLUMN NAME, COLUMN TYPE, NULLABLE RESPECTIVELY.
schema = StructType([StructField('Customer_Name',StringType(),False),
                     StructField('Customer_Id',IntegerType(),False),
                     StructField('Open_Date',StringType(),False),
                     StructField('Last_Consulted_Date',StringType(),True),
                     StructField('Vaccination_Id',StringType(),True),
                     StructField('Dr_Name',StringType(),True),
                     StructField('State',StringType(),True),
                     StructField('Country',StringType(),True),
                     StructField('DOB',StringType(),True),
                     StructField('Is_Active',StringType(),True)
                    ])

In [0]:
#READING THE SAMPLE FILE TO MAKE SURE THE DATA IS CORRECT
sample_incubyte = spark.read.format("csv").option('header','true').schema(schema).load("dbfs:/FileStore/shared_uploads/sample_incubyte.csv")
display(sample_incubyte)

Customer_Name,Customer_Id,Open_Date,Last_Consulted_Date,Vaccination_Id,Dr_Name,State,Country,DOB,Is_Active
Alex,123457,10/12/2010,10/13/2012,MVD,Paul,SA,USA,6031987,A
John,123458,10/12/2010,10/13/2012,MVD,Paul,TN,IND,6031987,A
Mathew,123459,10/13/2012,,MVD,Paul,WAS,PHIL,6031987,A
Matt,12345,10/12/2010,10/13/2012,MVD,Paul,BOS,NYC,6031987,A
Jacob,1256,10/12/2010,10/13/2012,MVD,Paul,VIC,AU,6031987,A


## DEFINING A UDF TO CONVERT DOB INTO A TIMESTAMP

In [0]:

def decoding_date(d):
  y = d[-4:]
  m = d[-6:-4]
  dd = d[:-6]
  return datetime.datetime(int(y),int(m),int(dd))

#decoding_date('6031987')
get_timestamp_func = udf(decoding_date,TimestampType())

## STREAMING THE CUSTOMER DATA INTO DELTA TABLE

1. readstream reads any new csv that has been uploaded to the path.

2. writestream writes the new rows to the delta table which is partitioned by country.

3. partitioning lets us divide the data by the unique values of given partition columns.

4. Checkpoint keeps a track of the last stream.

5. eg the delta table structure in the delta lake.

**File structure of the customer dim after partitioning**


customer_dim
                |


                USA
                  |
                  0001.parquet
                  .
                  .
                  .
                |
                IND
                  |
                  0001.parquet
                  .
                  .
                  .
                |
                .
                .
                .
                .
            



**Thus, this approach would keep the data intact and also lead to better search speeds while keeping everything under one roof.**

## Defining a function for upsert

This function takes the microbatch read by the stream and upserts it into the target table

In [0]:
def upsertToDelta(microBatchOutputDF, batchId): 
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")
  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO customer_dim t
    USING updates s
    ON (s.Customer_Name = t.Customer_Name and s.Customer_Id = t.Customer_Id)
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

## Creating sink delta table 



It will only be created if it the delta table doesn't already exists

In [0]:
# Create or replace table with path and add properties
customer_dim = (DeltaTable
                .createIfNotExists(spark)
                .addColumn("Customer_Name", StringType()) \
                .addColumn("Customer_Id", "STRING") \
                .addColumn("Open_Date", TimestampType()) \
                .addColumn("Last_Consulted_Date", TimestampType()) \
                .addColumn("Vaccination_Id", "STRING") \
                .addColumn("Dr_Name", "STRING") \
                .addColumn("State", "STRING") \
                .addColumn("Country", "STRING") \
                .addColumn("DOB", TimestampType()) \
                .addColumn("Is_Active", "STRING") \
                .property("description", "table with customer data") \
                .location("/mnt/customer_dim") 
                .property("checkpointLocation", "/mnt/customer_dim_checpoint/")
                .partitionedBy("Country")
                .execute()
               )
c_dim = spark.read.format('delta').load("/mnt/customer_dim")
c_dim.createOrReplaceTempView('customer_dim')
#c_dim.write.format('delta').saveAsTable('customer_dim')

## Creating global table / hive table of the sink

In [0]:
# Create a delta table
spark.sql("""
  DROP TABLE IF EXISTS customer
""")
spark.sql("""
  CREATE TABLE customer
  USING DELTA
  LOCATION '{}'
""".format('/mnt/customer_dim'))

## Checking for data in the sink

In [0]:
%sql
SELECT * FROM customer

Customer_Name,Customer_Id,Open_Date,Last_Consulted_Date,Vaccination_Id,Dr_Name,State,Country,DOB,Is_Active


## Streaming the data

In [0]:
input_df = (spark
             .readStream
             .format('csv')
             .option('header','true')
             .schema(schema)
             .option("maxFilesPerTrigger", "1") 
             .csv("dbfs:/FileStore/shared_uploads/")
             .withColumn('Open_Date',unix_timestamp(col('Open_Date'), 'MM/dd/yyyy').cast("timestamp"))             
             .withColumn('Last_Consulted_Date',unix_timestamp(col('Last_Consulted_Date'), 'MM/dd/yyyy').cast("timestamp"))
             .withColumn('DOB',get_timestamp_func('DOB'))
             .writeStream
             .option("checkpointLocation", "/mnt/checkpoint_location/")
             .foreachBatch(upsertToDelta)
             .start()
           )
display(input_df)

## Checking the sink for the streamed data

In [0]:
%sql
SELECT * FROM customer

Customer_Name,Customer_Id,Open_Date,Last_Consulted_Date,Vaccination_Id,Dr_Name,State,Country,DOB,Is_Active
Matt,12345,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,BOS,NYC,1987-03-06T00:00:00.000+0000,A
Jacob,1256,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,VIC,AU,1987-03-06T00:00:00.000+0000,A
Alex,123457,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,SA,USA,1987-03-06T00:00:00.000+0000,A
John,123458,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,TN,IND,1987-03-06T00:00:00.000+0000,A
Mathew,123459,2012-10-13T00:00:00.000+0000,,MVD,Paul,WAS,PHIL,1987-03-06T00:00:00.000+0000,A


## Tracking the history of changes that have been made to the sink table

In [0]:
%sql 
DESCRIBE HISTORY customer

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
1,2021-09-10T21:04:45.000+0000,1097146032422996,sannidhya.malpani@celebaltech.com,MERGE,"Map(predicate -> ((s.`Customer_Name` = t.`Customer_Name`) AND (s.`Customer_Id` = CAST(t.`Customer_Id` AS INT))), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(2132181019947096),0910-175736-abler001847,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 5, executionTimeMs -> 8140, numTargetRowsInserted -> 5, scanTimeMs -> 5389, numTargetRowsUpdated -> 0, numOutputRows -> 5, numTargetChangeFilesAdded -> 0, numSourceRows -> 5, numTargetFilesRemoved -> 0, rewriteTimeMs -> 2652)",
0,2021-09-10T21:04:28.000+0000,1097146032422996,sannidhya.malpani@celebaltech.com,CREATE TABLE,"Map(isManaged -> false, description -> null, partitionBy -> [""Country""], properties -> {""description"":""table with customer data"",""checkpointlocation"":""/mnt/customer_dim_checpoint/""})",,List(2132181019947096),0910-175736-abler001847,,SnapshotIsolation,True,Map(),
