In [0]:
"""
We are going to identify Type 2 rows on the basis of change in address
If we have to identify changes in any field then better to create md5 hash and store that in fact sales_df and use that to filter new or update records
"""

customer_dim_data = [
    (1,'manish','arwal','india','N','2022-09-15','2022-09-25'),
    (2,'vikash','patna','india','Y','2023-08-12',None),
    (3,'nikita','delhi','india','Y','2023-09-10',None),
    (4,'rakesh','jaipur','india','Y','2023-06-10',None),
    (5,'ayush','NY','USA','Y','2023-06-10',None),
    (1,'manish','gurgaon','india','Y','2022-09-25',None),
]
customer_schema= ['id','name','city','country','active','effective_start_date','effective_end_date']
customer_dim_df = spark.createDataFrame(data= customer_dim_data,schema=customer_schema)

sales_data = [
    (1,1,'manish','2023-01-16','gurgaon','india',380),
    (77,1,'manish','2023-03-11','bangalore','india',300),
    (12,3,'nikita','2023-09-20','delhi','india',127),
    (54,4,'rakesh','2023-08-10','jaipur','india',321),
    (65,5,'ayush','2023-09-07','mosco','russia',765),
    (89,6,'rajat','2023-08-10','jaipur','india',321)
]
sales_schema = ['sales_id', 'customer_id','customer_name', 'sales_date', 'food_delivery_address','food_delivery_country', 'food_cost']
sales_df = spark.createDataFrame(data=sales_data,schema=sales_schema)

customer_dim_df.show() # dim target/trgt data
sales_df.show()        # incoming source/src data

+---+------+-------+-------+------+--------------------+------------------+
| id|  name|   city|country|active|effective_start_date|effective_end_date|
+---+------+-------+-------+------+--------------------+------------------+
|  1|manish|  arwal|  india|     N|          2022-09-15|        2022-09-25|
|  2|vikash|  patna|  india|     Y|          2023-08-12|              null|
|  3|nikita|  delhi|  india|     Y|          2023-09-10|              null|
|  4|rakesh| jaipur|  india|     Y|          2023-06-10|              null|
|  5| ayush|     NY|    USA|     Y|          2023-06-10|              null|
|  1|manish|gurgaon|  india|     Y|          2022-09-25|              null|
+---+------+-------+-------+------+--------------------+------------------+

+--------+-----------+-------------+----------+---------------------+---------------------+---------+
|sales_id|customer_id|customer_name|sales_date|food_delivery_address|food_delivery_country|food_cost|
+--------+-----------+-------------

In [0]:
joined_data = customer_dim_df.join(sales_df, customer_dim_df["id"]==sales_df["customer_id"], "left")
display(joined_data)

id,name,city,country,active,effective_start_date,effective_end_date,sales_id,customer_id,customer_name,sales_date,food_delivery_address,food_delivery_country,food_cost
1,manish,arwal,india,N,2022-09-15,2022-09-25,77.0,1.0,manish,2023-03-11,bangalore,india,300.0
1,manish,arwal,india,N,2022-09-15,2022-09-25,1.0,1.0,manish,2023-01-16,gurgaon,india,380.0
2,vikash,patna,india,Y,2023-08-12,,,,,,,,
3,nikita,delhi,india,Y,2023-09-10,,12.0,3.0,nikita,2023-09-20,delhi,india,127.0
4,rakesh,jaipur,india,Y,2023-06-10,,54.0,4.0,rakesh,2023-08-10,jaipur,india,321.0
5,ayush,NY,USA,Y,2023-06-10,,65.0,5.0,ayush,2023-09-07,mosco,russia,765.0
1,manish,gurgaon,india,Y,2022-09-25,,77.0,1.0,manish,2023-03-11,bangalore,india,300.0
1,manish,gurgaon,india,Y,2022-09-25,,1.0,1.0,manish,2023-01-16,gurgaon,india,380.0


In [0]:
from pyspark.sql.functions import *

# Handle update records Step 1 is to mark active
new_record_df = joined_data.filter((col("food_delivery_address") != col("city")) & (col("active") == "Y"))\
    .withColumn("active", lit("Y"))\
    .withColumn("effective_start_date", col("sales_date"))\
    .withColumn("effective_end_date", lit(None))\
    .select("customer_id", "customer_name", col("food_delivery_address").alias("city"), "food_delivery_country", "active", "effective_start_date", "effective_end_date")

new_record_df.show()

+-----------+-------------+---------+---------------------+------+--------------------+------------------+
|customer_id|customer_name|     city|food_delivery_country|active|effective_start_date|effective_end_date|
+-----------+-------------+---------+---------------------+------+--------------------+------------------+
|          1|       manish|bangalore|                india|     Y|          2023-03-11|              null|
|          5|        ayush|    mosco|               russia|     Y|          2023-09-07|              null|
+-----------+-------------+---------+---------------------+------+--------------------+------------------+



In [0]:
# Handle update records Step 2 is to mark existing records as inactive (active=N, and update effective_end_date)

old_record_df = joined_data.filter((col("food_delivery_address") != col("city")) & (col("active") == "Y"))\
    .withColumn("active", lit("N"))\
    .withColumn("effective_end_date", col("sales_date"))\
    .select("customer_id", "customer_name", "city", "food_delivery_country", "active", "effective_start_date", "effective_end_date")

old_record_df.show()

+-----------+-------------+-------+---------------------+------+--------------------+------------------+
|customer_id|customer_name|   city|food_delivery_country|active|effective_start_date|effective_end_date|
+-----------+-------------+-------+---------------------+------+--------------------+------------------+
|          1|       manish|gurgaon|                india|     N|          2022-09-25|        2023-03-11|
|          5|        ayush|     NY|               russia|     N|          2023-06-10|        2023-09-07|
+-----------+-------------+-------+---------------------+------+--------------------+------------------+



In [0]:
# Handle new records 

new_customer_df = sales_df.join(customer_dim_df, sales_df["customer_id"]==customer_dim_df["id"], "left_anti")\
    .withColumn("active", lit("Y"))\
    .withColumn("effective_start_date", col("sales_date"))\
    .withColumn("effective_end_date", lit(None))\
    .select("customer_id", "customer_name", "food_delivery_address", "food_delivery_country", "active", "effective_start_date", "effective_end_date")

new_customer_df.show()

+-----------+-------------+---------------------+---------------------+------+--------------------+------------------+
|customer_id|customer_name|food_delivery_address|food_delivery_country|active|effective_start_date|effective_end_date|
+-----------+-------------+---------------------+---------------------+------+--------------------+------------------+
|          6|        rajat|               jaipur|                india|     Y|          2023-08-10|              null|
+-----------+-------------+---------------------+---------------------+------+--------------------+------------------+



In [0]:
final_record_df = customer_dim_df.union(new_record_df).union(old_record_df).union(new_customer_df)

final_record_df.show()
"""
|  1|manish|  gurgaon|  india|     Y|          2022-09-25|              null|
|  1|manish|bangalore|  india|     Y|          2023-03-11|              null|
How can Manish exists in 2 locations ?!
Here drop unwanted records based on effective_start_date desc, keep latest row drop old rows
"""

+---+------+---------+-------+------+--------------------+------------------+
| id|  name|     city|country|active|effective_start_date|effective_end_date|
+---+------+---------+-------+------+--------------------+------------------+
|  1|manish|    arwal|  india|     N|          2022-09-15|        2022-09-25|
|  2|vikash|    patna|  india|     Y|          2023-08-12|              null|
|  3|nikita|    delhi|  india|     Y|          2023-09-10|              null|
|  4|rakesh|   jaipur|  india|     Y|          2023-06-10|              null|
|  5| ayush|       NY|    USA|     Y|          2023-06-10|              null|
|  1|manish|  gurgaon|  india|     Y|          2022-09-25|              null|
|  1|manish|bangalore|  india|     Y|          2023-03-11|              null|
|  5| ayush|    mosco| russia|     Y|          2023-09-07|              null|
|  1|manish|  gurgaon|  india|     N|          2022-09-25|        2023-03-11|
|  5| ayush|       NY| russia|     N|          2023-06-10|      

In [0]:
from pyspark.sql.window import *

# sorting wont happen correctly if type is string, cast to date first and then sort
final_record_df.withColumn("effective_start_date", col("effective_start_date").cast("date")).withColumn("rn", row_number().over(Window.partitionBy("id", "active").orderBy(desc("effective_start_date")))).printSchema()


final_record_df.withColumn("effective_start_date", col("effective_start_date").cast("date")).withColumn("rn", row_number().over(Window.partitionBy("id", "active").orderBy(col("effective_start_date").desc())))\
    .filter(~((col("rn")>=2) & (col("active")=="Y")))\
    .show()


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- active: string (nullable = true)
 |-- effective_start_date: date (nullable = true)
 |-- effective_end_date: string (nullable = true)
 |-- rn: integer (nullable = false)

+---+------+---------+-------+------+--------------------+------------------+---+
| id|  name|     city|country|active|effective_start_date|effective_end_date| rn|
+---+------+---------+-------+------+--------------------+------------------+---+
|  1|manish|  gurgaon|  india|     N|          2022-09-25|        2023-03-11|  1|
|  1|manish|    arwal|  india|     N|          2022-09-15|        2022-09-25|  2|
|  1|manish|bangalore|  india|     Y|          2023-03-11|              null|  1|
|  2|vikash|    patna|  india|     Y|          2023-08-12|              null|  1|
|  3|nikita|    delhi|  india|     Y|          2023-09-10|              null|  1|
|  4|rakesh|   jaipur

In [0]:
cityDF = spark.createDataFrame([ 
        ("Delhi","India"),
        ("Kolkata","India"),
        ("Mumbai","India"),
        ("Nairobi","Kenya"),
        ("Colombo","Srilanka")],
        ["City","Country"]
    )
codeDF = spark.createDataFrame([
        ("011","Delhi"),
        ("022","Mumbai"),
        ("033","Kolkata"),
        ("044","Chennai")], 
        ["Code","City"]
    )

cityDF.show()
codeDF.show()

+-------+--------+
|   City| Country|
+-------+--------+
|  Delhi|   India|
|Kolkata|   India|
| Mumbai|   India|
|Nairobi|   Kenya|
|Colombo|Srilanka|
+-------+--------+

+----+-------+
|Code|   City|
+----+-------+
| 011|  Delhi|
| 022| Mumbai|
| 033|Kolkata|
| 044|Chennai|
+----+-------+



In [0]:
cityDF.join(codeDF, cityDF["City"] == codeDF["City"], "left_semi").show()

"""
    left_semi join acts as exists in sql
    so when left table is joined with right using left_semi output will return only matching rows+ left table columns
"""

+-------+-------+
|   City|Country|
+-------+-------+
|  Delhi|  India|
|Kolkata|  India|
| Mumbai|  India|
+-------+-------+



In [0]:
cityDF.join(codeDF, cityDF["City"] == codeDF["City"], "left_anti").show()

+-------+--------+
|   City| Country|
+-------+--------+
|Nairobi|   Kenya|
|Colombo|Srilanka|
+-------+--------+



In [0]:
"""
#cols will vary for each dim table
#generic scd2 should be applied for all dim tables

1. target_hash based on col values and have it in dim table(cols should match with source)
2. source_hash for incoming source data, have it in source table (cols should match with dim)

TODO above
==================================================================================================
==================================================================================================

Update scenario
    :mark existing rec active=N, end_date=todays date
    :mark new rec active=Y, end_date=null

Consider id as pk
==================================================================================================
DAY 1 

Full Load will be treated as all new rows 
active=Y
eff_start_dt=row's date
eff_end_dt=9999-12-31

===================================================================================================================
1 John Y 01-Jan 9999-12-31   <---- trgt/dim table
1 Josh   02-Jan              <---- src/incoming table

Left join when left table=trgt and right table=src + active=Y (mark inactive)
1 John Y 01-Jan null  1 Josh   02-Jan      
1 John 01-Jan.withColumn("active", lit("N")).withColumn("eff_end_dt", src.date which is 02-Jan) => 1 John N 01-Jan 02-Jan

dim_t
1 John N 01-Jan 02-Jan 

**** LEFT_SEMI JOIN WILL RETURN MATCHING ROWS BUT COLUMNS ONLY FROM LEFT TABLE ****
Left semi join when left table=src and right table=trgt + active!=Y
1 Josh   02-Jan 1 John N 01-Jan 02-Jan  
1 Josh   02-Jan.withColumn("active", lit("Y")).withColumn("eff_end_dt", null or 9999-12-31) =>  1 Josh Y 02-Jan 9999-12-31

dim_t
1 Josh 02-Jan 9999-12-31 Y
1 John 01-Jan 02-Jan     N
===================================================================================================================
src
1 Joshua   03-Jan

Left join when left table=trgt and right table=src + active=Y
1 Josh 02-Jan Y 9999-12-31      1 Joshua   03-Jan
1 Josh 02-Jan.withColumn("active", lit("N")).withColumn("eff_end_dt", src.date which is 03-Jan) => 1 Josh 02-Jan N 03-Jan 

dim_t
1 Josh 02-Jan 03-Jan  N 
1 John 01-Jan 02-Jan  N 

Left semi join when left table=src and right table=trgt + active!=Y
1 Joshua   03-Jan   1 Josh 02-Jan 03-Jan  N |
1 Joshua   03-Jan   1 John 01-Jan 02-Jan  N | src_df
src_df.select("left table cols").distinct().withColumn("active", lit("Y")).withColumn("eff_end_dt", null or 9999-12-31) =>  1 Joshua   03-Jan Y 9999-12-31

dim_t
1 Joshua  Y  03-Jan  9999-12-31
1 Josh    N  02-Jan  03-Jan 
1 John    N  01-Jan  02-Jan 
===================================================================================================================
src
2 Scott     04-Jan
1 Jake      04-Jan

Left join when left table=trgt and right table=src + active=Y
1 Joshua  Y  03-Jan  9999-12-31 1 Jake      04-Jan
1 Joshua  Y  03-Jan.withColumn("active", lit("N")).withColumn("eff_end_dt", src.date which is 04-Jan) => 1 Joshua 03-Jan 04-Jan N

dim_t
1 Joshua  N  03-Jan  9999-12-31
1 Josh    N  02-Jan  03-Jan 
1 John    N  01-Jan  02-Jan 

Left semi join when left table=src and right table=trgt + active!=Y
2 Scott     04-Jan  null     null   null     null   |
1 Jake      04-Jan  Joshua  N     03-Jan   04-Jan   |src_df
1 Jake      04-Jan  Josh    N     02-Jan   03-Jan   |
1 Jake      04-Jan  John    N     01-Jan   02-Jan   |

src_df.select("left table cols").distinct().withColumn("active", lit("Y")).withColumn("eff_end_dt", null or 9999-12-31) => 
2 Scott     04-Jan Y  9999-12-31
1 Jake      04-Jan Y  9999-12-31 

dim_t
2 Scott   Y  04-Jan  9999-12-31
1 Jake    Y  04-Jan  9999-12-31
1 Joshua  N  03-Jan  04-Jan
1 Josh    N  02-Jan  03-Jan 
1 John    N  01-Jan  02-Jan

"""