#### Importing required libraries and functions for building ETL pipeline

In [1]:
import pyspark
from pyspark.sql import SparkSession  #for starting spark session
from delta.pip_utils import configure_spark_with_delta_pip  
import pyspark.sql.functions as F  #data manipulation
from delta.tables import *  #reading and writing delta tables

##### Using pyspark builder to started spark session

In [2]:
builder = pyspark.sql.SparkSession.builder.appName("ETL") \
                     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                     .config('spark.databricks.delta.properties.defaults.enableChangeDataFeed', True)  
spark = configure_spark_with_delta_pip(builder).getOrCreate()

Reading the raw data into spark dataframe

In [3]:
data = spark.read.csv('D:/datalake/source/data.csv',header='true',inferSchema='true')

In [4]:
data.count()

10

In [5]:
data.show()

+-----------+---------------+---+-----------+------------+------------+
|Customer_ID|           Name|Age|        SSN|  Occupation|Credit_Score|
+-----------+---------------+---+-----------+------------+------------+
| CUS_0x3187|           null| 39|024-52-4439|   Architect|        Poor|
| CUS_0x3fbf|Rick Rothackerj| 28|004-07-5839|     Teacher|    Standard|
| CUS_0x953c|         Langep| 34|486-85-3974|     _______|        Good|
| CUS_0x4aaa|         Jasond| 55|072-31-6145|Entrepreneur|    Standard|
| CUS_0x2947|         Deepaa| 21|615-06-7821|   Developer|    Standard|
| CUS_0x30e0|             Np| 31|612-70-8987|      Lawyer|        Good|
| CUS_0xc3b7|         Nadiaq| 34|411-51-0676|      Lawyer|        Good|
| CUS_0x1e9b|        Taylorb| 18|866-51-6415|      Writer|        Poor|
| CUS_0x3b88|          Mikex| 40|  #F%$D@*&8|   Scientist|    Standard|
| CUS_0xae51|           null| 46|082-17-4774|Entrepreneur|    Standard|
+-----------+---------------+---+-----------+------------+------

Writing the spark datafram into delta table and saving it in bronze_layer 

In [6]:
data.write.mode('overwrite').format('delta').save('D:/datalake/bronze_data')

reading the bronze data into spark and cleaning the data and saving it to silver layer

In [7]:
bronze_data = spark.read.format('delta').load('D:/datalake/bronze_data')

In [8]:
bronze_cleaned = bronze_data.withColumn('Occupation',F.regexp_replace('Occupation','_______','NaN')) \
                .withColumn('SSN',F.regexp_replace('SSN','[^0-9a-zA-Z_\-]+','')) \
                .dropDuplicates()

In [9]:
bronze_cleaned.show()

+-----------+---------------+---+-----------+------------+------------+
|Customer_ID|           Name|Age|        SSN|  Occupation|Credit_Score|
+-----------+---------------+---+-----------+------------+------------+
| CUS_0x3fbf|Rick Rothackerj| 28|004-07-5839|     Teacher|    Standard|
| CUS_0x953c|         Langep| 34|486-85-3974|         NaN|        Good|
| CUS_0xae51|           null| 46|082-17-4774|Entrepreneur|    Standard|
| CUS_0x4aaa|         Jasond| 55|072-31-6145|Entrepreneur|    Standard|
| CUS_0x3b88|          Mikex| 40|        FD8|   Scientist|    Standard|
| CUS_0xc3b7|         Nadiaq| 34|411-51-0676|      Lawyer|        Good|
| CUS_0x3187|           null| 39|024-52-4439|   Architect|        Poor|
| CUS_0x2947|         Deepaa| 21|615-06-7821|   Developer|    Standard|
| CUS_0x1e9b|        Taylorb| 18|866-51-6415|      Writer|        Poor|
| CUS_0x30e0|             Np| 31|612-70-8987|      Lawyer|        Good|
+-----------+---------------+---+-----------+------------+------

writing the cleaned data into silver layer

In [10]:
bronze_cleaned.write.mode('overwrite').format('delta').save('D:/datalake/silver_data')

Reading the new data into spark session

In [11]:
data_update =  spark.read.csv('D:/datalake/source/dataCopy.csv',header='true',inferSchema='true')

In [12]:
data_update.count()

7

In [13]:
data_update.show()

+-----------+---------------+---+-----------+-------------+------------+
|Customer_ID|           Name|Age|        SSN|   Occupation|Credit_Score|
+-----------+---------------+---+-----------+-------------+------------+
| CUS_0x3187|         vishnu| 39|024-52-4439|    Architect|        Poor|
| CUS_0x3fbf|Rick Rothackerj| 28|004-07-5839|      Teacher|    Standard|
| CUS_0xb288|      Nicolacir| 43|390-44-4596|   Journalist|    Standard|
| CUS_0x7a22|          Sabam| 49|478-48-4669|    Developer|    Standard|
| CUS_0x7a29|         vishnu| 31| 123-5677-5|Data Engineer|        Good|
|CUS_0xb2101|           Mike| 51|123-5677-51|         Mech|    Standard|
|CUS_0x7a220|          Tyson| 52|123-5677-51|         Mech|        Poor|
+-----------+---------------+---+-----------+-------------+------------+



Merging the data with bronze delta table

In [14]:
bronze_data = DeltaTable.forPath(spark,'D:/datalake/bronze_data')

In [15]:
bronze_data.alias('b').merge(
                       data_update.alias('u'),
                       "b.Customer_ID =u.Customer_ID") \
                       .whenMatchedUpdate(set={
                       "Customer_ID": "u.Customer_ID",
                       "Occupation": "u.Occupation",
                       "Name": "u.Name",
                       "Credit_Score": "u.Credit_Score"
                       }) \
                       .whenNotMatchedInsertAll() \
                       .execute()

In [16]:
bronze_cleaned.count()

15

In [17]:
bronze_cleaned.show()

+-----------+---------------+---+-----------+-------------+------------+
|Customer_ID|           Name|Age|        SSN|   Occupation|Credit_Score|
+-----------+---------------+---+-----------+-------------+------------+
| CUS_0x3fbf|Rick Rothackerj| 28|004-07-5839|      Teacher|    Standard|
| CUS_0x7a29|         vishnu| 31| 123-5677-5|Data Engineer|        Good|
| CUS_0x953c|         Langep| 34|486-85-3974|          NaN|        Good|
| CUS_0xae51|           null| 46|082-17-4774| Entrepreneur|    Standard|
| CUS_0x4aaa|         Jasond| 55|072-31-6145| Entrepreneur|    Standard|
| CUS_0x3b88|          Mikex| 40|        FD8|    Scientist|    Standard|
| CUS_0x3187|         vishnu| 39|024-52-4439|    Architect|        Poor|
|CUS_0x7a220|          Tyson| 52|123-5677-51|         Mech|        Poor|
| CUS_0xc3b7|         Nadiaq| 34|411-51-0676|       Lawyer|        Good|
|CUS_0xb2101|           Mike| 51|123-5677-51|         Mech|    Standard|
| CUS_0x2947|         Deepaa| 21|615-06-7821|    De

In [18]:
cdc = spark.read.parquet('D:\\datalake\\bronze_data\\_change_data')

In [19]:
cdc.show()

+-----------+---------------+---+-----------+-------------+------------+----------------+
|Customer_ID|           Name|Age|        SSN|   Occupation|Credit_Score|    _change_type|
+-----------+---------------+---+-----------+-------------+------------+----------------+
| CUS_0x3187|           null| 39|024-52-4439|    Architect|        Poor| update_preimage|
| CUS_0x3187|         vishnu| 39|024-52-4439|    Architect|        Poor|update_postimage|
| CUS_0x3fbf|Rick Rothackerj| 28|004-07-5839|      Teacher|    Standard| update_preimage|
| CUS_0x3fbf|Rick Rothackerj| 28|004-07-5839|      Teacher|    Standard|update_postimage|
| CUS_0x7a22|          Sabam| 49|478-48-4669|    Developer|    Standard|          insert|
|CUS_0x7a220|          Tyson| 52|123-5677-51|         Mech|        Poor|          insert|
| CUS_0x7a29|         vishnu| 31| 123-5677-5|Data Engineer|        Good|          insert|
|CUS_0xb2101|           Mike| 51|123-5677-51|         Mech|    Standard|          insert|
| CUS_0xb2