## SCD Type-2 Implemementation

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import datetime as dt

In [0]:
customer_dim_data = [

(1,'manish','arwal','india','N','2022-09-15','2022-09-25'),
(2,'vikash','patna','india','Y','2023-08-12',None),
(3,'nikita','delhi','india','Y','2023-09-10',None),
(4,'rakesh','jaipur','india','Y','2023-06-10',None),
(5,'ayush','NY','USA','Y','2023-06-10',None),
(1,'manish','gurgaon','india','Y','2022-09-25',None),
]

customer_schema= ['id','name','city','country','active','effective_start_date','effective_end_date']

customer_dim_df = spark.createDataFrame(data= customer_dim_data,schema=customer_schema)

sales_data = [

(1,1,'manish','2023-01-16','gurgaon','india',380),
(77,1,'manish','2023-03-11','bangalore','india',300),
(12,3,'nikita','2023-09-20','delhi','india',127),
(54,4,'rakesh','2023-08-10','jaipur','india',321),
(65,5,'ayush','2023-09-07','mosco','russia',765),
(89,6,'rajat','2023-08-10','jaipur','india',321)
]

sales_schema = ['sales_id', 'customer_id','customer_name', 'sales_date', 'food_delivery_address','food_delivery_country', 'food_cost']

sales_df = spark.createDataFrame(data=sales_data,schema=sales_schema)

# sales- fact table
# customer-dimention

In [0]:
# step1 compare the changes

joinedDf= customer_dim_df.join(sales_df,customer_dim_df["id"]==sales_df["customer_id"],"left")
joinedDf.show()

+---+------+-------+-------+------+--------------------+------------------+--------+-----------+-------------+----------+---------------------+---------------------+---------+
| id|  name|   city|country|active|effective_start_date|effective_end_date|sales_id|customer_id|customer_name|sales_date|food_delivery_address|food_delivery_country|food_cost|
+---+------+-------+-------+------+--------------------+------------------+--------+-----------+-------------+----------+---------------------+---------------------+---------+
|  1|manish|  arwal|  india|     N|          2022-09-15|        2022-09-25|      77|          1|       manish|2023-03-11|            bangalore|                india|      300|
|  1|manish|  arwal|  india|     N|          2022-09-15|        2022-09-25|       1|          1|       manish|2023-01-16|              gurgaon|                india|      380|
|  2|vikash|  patna|  india|     Y|          2023-08-12|              NULL|    NULL|       NULL|         NULL|      NULL

In [0]:
# step 2 filter the records
## finding out the records where cities are not same and active is Y
DF1=joinedDf.where((col("food_delivery_address") != col("city")) & (col("active")=="Y"))\
    .withColumn("active",lit("Y"))\
    .withColumn("effective_start_date", col("sales_date"))\
        .withColumn("effective_end_date",lit(None))\
            .select("customer_id",
                    "customer_name",
                    col("food_delivery_address").alias("city"),
                    "food_delivery_country",
                    "active",
                    "effective_start_date",
                    "effective_end_date")

DF1.show()

+-----------+-------------+---------+---------------------+------+--------------------+------------------+
|customer_id|customer_name|     city|food_delivery_country|active|effective_start_date|effective_end_date|
+-----------+-------------+---------+---------------------+------+--------------------+------------------+
|          1|       manish|bangalore|                india|     Y|          2023-03-11|              NULL|
|          5|        ayush|    mosco|               russia|     Y|          2023-09-07|              NULL|
+-----------+-------------+---------+---------------------+------+--------------------+------------------+



In [0]:
#updating old records
old_data=joinedDf.where((col("food_delivery_address") != col("city")) & (col("active")=="Y"))\
    .withColumn("active",lit("N"))\
        .withColumn("effective_end_date",col("sales_date"))\
            .select("customer_id",
                    "customer_name",
                    "city",
                    "food_delivery_country",
                    "active",
                    "effective_start_date",
                    "effective_end_date")

old_data.show()

+-----------+-------------+-------+---------------------+------+--------------------+------------------+
|customer_id|customer_name|   city|food_delivery_country|active|effective_start_date|effective_end_date|
+-----------+-------------+-------+---------------------+------+--------------------+------------------+
|          1|       manish|gurgaon|                india|     N|          2022-09-25|        2023-03-11|
|          5|        ayush|     NY|               russia|     N|          2023-06-10|        2023-09-07|
+-----------+-------------+-------+---------------------+------+--------------------+------------------+



In [0]:
# merging the records
Final_records=customer_dim_df.union(DF1).union(old_data)

Final_records.show()

+---+------+---------+-------+------+--------------------+------------------+
| id|  name|     city|country|active|effective_start_date|effective_end_date|
+---+------+---------+-------+------+--------------------+------------------+
|  1|manish|    arwal|  india|     N|          2022-09-15|        2022-09-25|
|  2|vikash|    patna|  india|     Y|          2023-08-12|              NULL|
|  3|nikita|    delhi|  india|     Y|          2023-09-10|              NULL|
|  4|rakesh|   jaipur|  india|     Y|          2023-06-10|              NULL|
|  5| ayush|       NY|    USA|     Y|          2023-06-10|              NULL|
|  1|manish|  gurgaon|  india|     Y|          2022-09-25|              NULL|
|  1|manish|bangalore|  india|     Y|          2023-03-11|              NULL|
|  5| ayush|    mosco| russia|     Y|          2023-09-07|              NULL|
|  1|manish|  gurgaon|  india|     N|          2022-09-25|        2023-03-11|
|  5| ayush|       NY| russia|     N|          2023-06-10|      

In [0]:
#Deleting duplicate old records 

windowSpec= Window.partitionBy("id","active").orderBy(col("effective_start_date").desc())

Final_records=Final_records.withColumn("rank",rank().over(windowSpec))\
    .filter(~((col("active")=="Y")&(col("rank")>=2))).drop("rank")

In [0]:
Final_records.show()

+---+------+---------+-------+------+--------------------+------------------+
| id|  name|     city|country|active|effective_start_date|effective_end_date|
+---+------+---------+-------+------+--------------------+------------------+
|  1|manish|  gurgaon|  india|     N|          2022-09-25|        2023-03-11|
|  1|manish|    arwal|  india|     N|          2022-09-15|        2022-09-25|
|  1|manish|bangalore|  india|     Y|          2023-03-11|              NULL|
|  2|vikash|    patna|  india|     Y|          2023-08-12|              NULL|
|  3|nikita|    delhi|  india|     Y|          2023-09-10|              NULL|
|  4|rakesh|   jaipur|  india|     Y|          2023-06-10|              NULL|
|  5| ayush|       NY| russia|     N|          2023-06-10|        2023-09-07|
|  5| ayush|    mosco| russia|     Y|          2023-09-07|              NULL|
+---+------+---------+-------+------+--------------------+------------------+

