##Assignment_Incubyte

####Please Note That there were many Approaches to solve this particular problem, Choices were made keeping quality and health of pipeline in mind. Few instances of such behaviour is presented below:
1. UDFs were avoided wherever possible, these are considered as bottlenecks when considered performance of the pipeline.
2. DELTA architcture was used, which is an state of art Storage layer, for optimised and efficient storage and performance.
3. Advance techniques like partitioning (for predicate pushdown), and Z-ordering were used to colocate similar data, for enhanced quering performance.
4. Schema of the dataframe was defined manually, to avoid scan of metadata, which results in inhibition of overall data pipeline performance.
5. DELTA provides many other useful properties like ACID properties and Time Travel.
6. Structured Streaming was used to create a end-to-end fault tolerant data ingestion system.

In [0]:
#IMPORT NECESSARY LIBs
from pyspark.sql.types import *
from pyspark.sql.functions import *
import datetime
from delta.tables import *

In [0]:
#DEFINING MANUAL SCHEMA
manualschema = StructType([StructField('customer_name',StringType(),False),
                     StructField('customer_id',IntegerType(),False),
                     StructField('open_date',StringType(),False),
                     StructField('last_consulted_date',StringType(),True),
                     StructField('vaccination_id',StringType(),True),
                     StructField('dr_name',StringType(),True),
                     StructField('state',StringType(),True),
                     StructField('country',StringType(),True),
                     StructField('DOB',StringType(),True),
                     StructField('is_active',StringType(),True)
                    ])

In [0]:
#INITIAL CHECK FOR DATA FILE
sample_incubyte = spark.read.format("csv").option('header','true').schema(manualschema).load("dbfs:/FileStore/tables/fact_customer/Sample_Data_Incubyte.csv")
display(sample_incubyte)

customer_name,customer_id,open_date,last_consulted_date,vaccination_id,dr_name,state,country,DOB,is_active
Alex,123457,20101012,20121013,MVD,Paul,SA,USA,6031987,A
John,123458,20101012,20121013,MVD,Paul,TN,IND,6031987,A
Mathew,123459,20101012,20121013,MVD,Paul,WAS,PHIL,6031987,A
Matt,12345,20101012,20121013,MVD,Paul,BOS,NYC,6031987,A
Jacob,1256,20101012,20121013,MVD,Paul,VIC,AU,6031987,A


In [0]:
# REMOVES DESCREPANCIES DUE TO SPARK VERSION DIFFERENCES
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [0]:
#UPSERTS IN DELTA IF EXISTS; ELSE CREATES NEW DELTA TABLE
path = "/mnt/customer_data_v_1"



def upsertion(microBatchOutputDF, batchId):
  if (DeltaTable.isDeltaTable(spark, path) == True) :
    deltaTable = DeltaTable.forPath(spark, path)
    deltaTable.alias("sink").merge(
        microBatchOutputDF.alias("new"),
        condition =("((new.customer_name = sink.customer_name) and (new.customer_id = sink.customer_id) and (new.open_date is not null))")) \
      .whenMatchedUpdate(set = { 'last_consulted_date' : 'new.last_consulted_date',
                           'vaccination_id': 'new.vaccination_id',
                           'dr_name':'new.dr_name',
                           'state':'new.state',
                           'country':'new.country',
                           'DOB':'new.DOB',
                           'is_active':'new.is_active' } ) \
      .whenNotMatchedInsertAll() \
      .execute()
  else :
    spark.sql('''CREATE DATABASE IF NOT EXISTS warehouse_incubyte''')
    microBatchOutputDF.write.format("delta").partitionBy('country').mode("overwrite").option('path',path).saveAsTable('warehouse_incubyte.customer_dim')
    
  spark.sql("OPTIMIZE warehouse_incubyte.customer_dim ZORDER BY (state, dr_name)")#colocates similar data, for faster performances
    
    
   
  




In [0]:
# Initiating the Stream process, Reading from source and writing it to sink.

checkpoint_path = '/mnt/checkpoint_customer_data_v_1/'

cust_data_stream = (spark
                    .readStream
                    .format('csv')
                    .option('header','true')
                    .schema(manualschema)
                    .option('maxFilesPerTrigger', '1')
                    .option('multiline', True)
                    .csv("dbfs:/FileStore/tables/fact_customer/")
                    .withColumn('open_date', to_timestamp(col('open_date'), 'yyyyMMdd'))
                    .withColumn('last_consulted_date', to_timestamp(col('last_consulted_date'), 'yyyyMMdd'))
                    .withColumn("DOB", when(length(col('DOB'))==7,to_timestamp(concat(lit('0'),col('DOB')),'ddMMyyyy')).otherwise(to_timestamp(col('DOB'), 'ddMMyyyy')))
                    .writeStream
                    .option('checkpointLocation', checkpoint_path)
                    .foreachBatch(upsertion)
                    .outputMode("update")
                    .start())
#display(cust_data_stream)

In [0]:
%sql
select *
from warehouse_incubyte.customer_dim

customer_name,customer_id,open_date,last_consulted_date,vaccination_id,dr_name,state,country,DOB,is_active
Mathew,123459,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,WAS,PHIL,1987-03-06T00:00:00.000+0000,A
Jacob,1256,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,VIC,AU,1987-03-06T00:00:00.000+0000,A
Matt,12345,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,BOS,NYC,1987-03-06T00:00:00.000+0000,A
Alex,123457,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,SA,USA,1987-03-06T00:00:00.000+0000,A
John,123458,2010-10-12T00:00:00.000+0000,2012-10-13T00:00:00.000+0000,MVD,Paul,TN,IND,1987-03-06T00:00:00.000+0000,A
