In [0]:
target_schema='customer_id long ,customer_fname string ,customer_lname string , username string , password string ,address string ,city string ,state string ,pincode long ,key long ,effective_date date,future_date date,active_flag boolean'
slowly_changing_dimensions=['username', 'password','address','city','state','pincode']

target_columns = ['customer_id'  ,'customer_fname'  ,'customer_lname'  , 'username'  , 'password'  ,'address'  ,'city'  ,'state'  ,'pincode'  ,'key'  ,'effective_date','future_date' ,'active_flag']
source_columns = ['customer_id'  ,'customer_fname'  ,'customer_lname'  , 'username'  , 'password'  ,'address'  ,'city'  ,'state'  ,'pincode']

In [0]:
source_df = spark.read\
    .format('csv')\
        .option('header',True)\
            .option('inferSchema',True)\
                .load('dbfs:/FileStore/data/scd_demo/customer_scd_demo_type2-1.csv')

In [0]:
source_df.show()

+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|customer_id|customer_fname|customer_lname| username| password|             address|       city|state|pincode|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|OrangeVilla|   TX|  78521|
|          2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|   CO|  80126|
|          3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|   PR|    725|
|          4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common| San Marcos|   CA|  92069|
|          5|        Robert|        Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|     Caguas|   PR|    725|
|          6|          Mary|         Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|    Passaic|   NJ|   7055|
|

In [0]:
from pyspark.sql.functions import *
dt = date_format(current_date(),'dd-MM-yyyy')
print(dt)
spark.sql('select date_format(current_date(),"dd-MM-yyyy") as today_date').show()


Column<'date_format(current_date(), dd-MM-yyyy)'>
+----------+
|today_date|
+----------+
|20-02-2024|
+----------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
mywin=Window.orderBy('customer_id')
target_df = source_df.withColumn('rownum',row_number().over(mywin))
target_df = target_df.withColumn('key', target_df['rownum'])\
        .withColumn('effective_date', date_format(current_date(),'dd-MM-yyyy'))\
        .withColumn('future_date', lit('31-12-9999'))\
        .withColumn('active_flag',lit('true'))\
                .drop('rownum')




In [0]:
target_df.show()

+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|customer_id|customer_fname|customer_lname| username| password|             address|       city|state|pincode|key|effective_date|future_date|active_flag|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|   TX|  78521|  1|    20-02-2024| 31-12-9999|       true|
|          2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|   CO|  80126|  2|    20-02-2024| 31-12-9999|       true|
|          3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|   PR|    725|  3|    20-02-2024| 31-12-9999|       true|
|          4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little

##storing the target df which is the enchanced version Data WareHouse back to the dbfs

In [0]:
target_df.write\
    .mode('overwrite').option('header','true')\
    .format('csv').option('path','dbfs:/FileStore/data/scd_demo/target')\
        .save()

        


In [0]:
target_active_df = target_df.where(target_df['active_flag']==True)
target_inactive_df=target_df.where(target_df['active_flag']==False)


print(target_active_df.count())
print(target_inactive_df.count())

12
0


###adding md5 columns in both target active and source df 

In [0]:
target_active_df=target_active_df.withColumn('hash_md5',md5(concat_ws('',*slowly_changing_dimensions)))
source_df= source_df.withColumn('hash_md5',md5(concat_ws('',*slowly_changing_dimensions )  ))
#source_df.show(truncate=False)
#target_active_df.show(truncate=False)


In [0]:
##joinning between target active and source df 
source_suffix='_source'
target_suffix='_target'


In [0]:
def column_renamer(df, suffix, append):
    #list=[]
    if append:
        for col in  df.columns:
            newcol=col+suffix
            #list.append(newcol)
            df =df.withColumnRenamed(col , newcol)
            
    else:
        for col in df.columns:
            new=col.replace(suffix,'')
            df=df.withColumnRenamed(col,new)


    return df         


In [0]:
target_active_df= column_renamer(target_active_df , target_suffix, True)
target_active_df.show()
source_df = column_renamer(source_df, source_suffix,True)
#source_df.show()

#


+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------+------------+--------------+----------+---------------------+------------------+------------------+--------------------+
|customer_id_target|customer_fname_target|customer_lname_target|username_target|password_target|      address_target|city_target|state_target|pincode_target|key_target|effective_date_target|future_date_target|active_flag_target|     hash_md5_target|
+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------+------------+--------------+----------+---------------------+------------------+------------------+--------------------+
|                 1|              Richard|            Hernandez|      XXXXXXXXX|      XXXXXXXXX|  6303 Heather Plaza|Brownsville|          TX|         78521|         1|           20-02-2024|        31-12-9999|              true|f2c1345c132ad8174...|


In [0]:
df_merge = target_active_df.join(source_df,  col('customer_id_target')==col('customer_id_source'), how='fullouter')
df_merge.show()

+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------+------------+--------------+----------+---------------------+------------------+------------------+--------------------+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------+------------+--------------+--------------------+
|customer_id_target|customer_fname_target|customer_lname_target|username_target|password_target|      address_target|city_target|state_target|pincode_target|key_target|effective_date_target|future_date_target|active_flag_target|     hash_md5_target|customer_id_source|customer_fname_source|customer_lname_source|username_source|password_source|      address_source|city_source|state_source|pincode_source|     hash_md5_source|
+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------+------------+----

In [0]:
df_merge  =df_merge.withColumn('Action', when(col('hash_md5_source')==col('hash_md5_target'),'NOCHANGE')\
                    .when(col('customer_id_target').isNull(), 'INSERT')\
                    .when(col('customer_id_source').isNull(), 'DELETE')\
                        .otherwise('UPDATE') )


In [0]:
df_merge.show()

+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------+------------+--------------+----------+---------------------+------------------+------------------+--------------------+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------+------------+--------------+--------------------+--------+
|customer_id_target|customer_fname_target|customer_lname_target|username_target|password_target|      address_target|city_target|state_target|pincode_target|key_target|effective_date_target|future_date_target|active_flag_target|     hash_md5_target|customer_id_source|customer_fname_source|customer_lname_source|username_source|password_source|      address_source|city_source|state_source|pincode_source|     hash_md5_source|  Action|
+------------------+---------------------+---------------------+---------------+---------------+--------------------+-----------

In [0]:
df_merge.select('Action').show()

+--------+
|  Action|
+--------+
|  UPDATE|
|NOCHANGE|
|NOCHANGE|
|NOCHANGE|
|NOCHANGE|
|NOCHANGE|
|NOCHANGE|
|NOCHANGE|
|NOCHANGE|
|NOCHANGE|
|  DELETE|
|  DELETE|
|  INSERT|
+--------+



In [0]:
max_key = df_merge.agg({'key_target':'max'}).collect()[0][0]
print(max_key)

12


In [0]:
#from pyspark.sql.functions import col
unchange_records = column_renamer(df_merge.where(col('Action')=='NOCHANGE'), target_suffix,False).select(target_df.columns)


In [0]:
unchange_records.show() 

+-----------+--------------+--------------+---------+---------+--------------------+----------+-----+-------+---+--------------+-----------+-----------+
|customer_id|customer_fname|customer_lname| username| password|             address|      city|state|pincode|key|effective_date|future_date|active_flag|
+-----------+--------------+--------------+---------+---------+--------------------+----------+-----+-------+---+--------------+-----------+-----------+
|          2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...| Littleton|   CO|  80126|  2|    20-02-2024| 31-12-9999|       true|
|          3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|    Caguas|   PR|    725|  3|    20-02-2024| 31-12-9999|       true|
|          4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|San Marcos|   CA|  92069|  4|    20-02-2024| 31-12-9999|       true|
|          5|        Robert|        Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ..

###Logic forr inserting new records

In [0]:
from pyspark.sql.functions import col 
insert_records= column_renamer(df_merge.where(col('Action')=='INSERT'),source_suffix,False).select(source_columns)
#insert_records.show()


insert_records= insert_records.withColumn('effective_date', date_format(current_date(),'dd-MM-yyyy'))\
                .withColumn('future_date',date_format(lit('31-12-9999'),'dd-MM-yyyy'))\
                    .withColumn('rownum',row_number().over(mywin))\
                        .withColumn('key',max_key + col('rownum'))\
                            .withColumn('active_flag',lit('true'))\
                                .drop('rownum')



In [0]:
insert_records.show()

+-----------+--------------+--------------+---------+---------+--------------------+--------+-----+-------+--------------+-----------+---+-----------+
|customer_id|customer_fname|customer_lname| username| password|             address|    city|state|pincode|effective_date|future_date|key|active_flag|
+-----------+--------------+--------------+---------+---------+--------------------+--------+-----+-------+--------------+-----------+---+-----------+
|         13|        Velina|         Smith|XXXXXXXXX|XXXXXXXXX|8598 Mari Gold  B...|Stafford|   OH|  12345|    20-02-2024|       null| 13|       true|
+-----------+--------------+--------------+---------+---------+--------------------+--------+-----+-------+--------------+-----------+---+-----------+



In [0]:
max_key = insert_records.agg({'key':'max'}).collect()[0][0]
print(max_key)

13


##updating the records

In [0]:
update_records = column_renamer(df_merge.where(col('Action')=='UPDATE'), target_suffix,False).select(target_columns)\
                .withColumn('future_date',date_format(current_date(),'dd-MM-yyyy'))\
                .withColumn('active_flag',lit('false'))\
                .unionByName(
                    column_renamer(df_merge.where(col('Action')=='UPDATE'),source_suffix,False).select(source_columns)\
                        .withColumn('effective_date',date_format(current_date(),'dd-MM-yyyy'))\
                        .withColumn('rownumber',row_number().over(mywin))\
                        .withColumn('key', max_key+col('rownumber'))\
                            .withColumn('future_date',date_format(lit('31-12-9999'),'dd-MM-yyyy'))\
                                .withColumn('active_flag',lit('true'))\
                                .drop('rownumber')
                )




In [0]:
update_records.show()

+-----------+--------------+--------------+---------+---------+------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|customer_id|customer_fname|customer_lname| username| password|           address|       city|state|pincode|key|effective_date|future_date|active_flag|
+-----------+--------------+--------------+---------+---------+------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|6303 Heather Plaza|Brownsville|   TX|  78521|  1|    20-02-2024| 20-02-2024|      false|
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|6303 Heather Plaza|OrangeVilla|   TX|  78521| 14|    20-02-2024|       null|       true|
+-----------+--------------+--------------+---------+---------+------------------+-----------+-----+-------+---+--------------+-----------+-----------+



##delete records

In [0]:
deleted_records = column_renamer(df_merge.where(col('Action')=='DELETE'),target_suffix,False).select(target_columns)
deleted_records = deleted_records.withColumn('future_date',date_format(current_date(),'dd-MM-yyyy'))\
    .withColumn('active_flag',lit('false'))

deleted_records.show()

+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|customer_id|customer_fname|customer_lname| username| password|             address|       city|state|pincode|key|effective_date|future_date|active_flag|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|         11|          Mary|       Huffman|XXXXXXXXX|XXXXXXXXX|    3169 Stony Woods|     Caguas|   PR|    725| 11|    20-02-2024| 20-02-2024|      false|
|         12|   Christopher|         Smith|XXXXXXXXX|XXXXXXXXX|5594 Jagged Ember...|San Antonio|   TX|  78227| 12|    20-02-2024| 20-02-2024|      false|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+---+--------------+-----------+-----------+



In [0]:
resultant_records = target_inactive_df.unionByName(update_records)\
    .unionByName(unchange_records)\
    .unionByName(insert_records)\
        .unionByName(deleted_records).orderBy('key')



In [0]:
resultant_records.show()

+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|customer_id|customer_fname|customer_lname| username| password|             address|       city|state|pincode|key|effective_date|future_date|active_flag|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+---+--------------+-----------+-----------+
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|   TX|  78521|  1|    20-02-2024| 20-02-2024|      false|
|          2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|   CO|  80126|  2|    20-02-2024| 31-12-9999|       true|
|          3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|   PR|    725|  3|    20-02-2024| 31-12-9999|       true|
|          4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little

## Below another approach of changing columns name since we cant do it directly like we do in pandas
## hence we use zip for unpacking have a look

In [0]:
data=[(20,'A'),(20,'A'),(20,'A'),(20,'A'),(20,'A')]
myschema= 'age int, name string'
df = spark.createDataFrame(data,schema=myschema)
#df.show()
newcols=[]
for col in df.columns:
    newcol = col+'_target'
    #print(newcol,end=' ')
    newcols.append(newcol)

for old,new in zip(df.columns , newcols):
    df= df.withColumnRenamed(old, new)
df.show()    

## explaining hash calculation with small exmaple

In [0]:
def get_hash(df,  list_of_cols):
	if len(list_of_cols)>0:
		return df.withColumn('hash_md5', md5(concat_ws('',*list_of_cols))) 
	else:
		return df.withColumn('hash_md5', md5(lit(1)) )

newdf= get_hash(df ,df.columns)
newdf.show(truncate=False)

	
		

