In [1]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from datetime import datetime

import ConnectionConfig as cc

cc.setupEnvironment()

In [2]:
spark = cc.startLocalCluster("dimKlantIncrementalLoad")
spark.getActiveSession()

# settng the parameters
The timestamp of the job is used to set the scd_end date of the previous record and the scd_start date of the new record.


In [3]:
run_timestamp =datetime.now() #The job runtime is stored in a variable


# Load the data from the delta table

In [4]:
df = spark.read.format("delta").load("spark-warehouse/dim_klant")

df.createOrReplaceTempView("dim_klant")
spark.sql("select * from dim_klant").show(2)

+------+--------------------+------------------+--------+-------------------+-------------------+--------------------+-------+
|userid|             address|subscriptiontypeid|klant_SK|          scd_start|            scd_end|            md5_hash|current|
+------+--------------------+------------------+--------+-------------------+-------------------+--------------------+-------+
|     2|Europalaan 43 , 2...|                 2|       0|1999-01-01 00:00:00|2100-12-12 00:00:00|3e10060458ebd8bb2...|   true|
|     4|Graaf Joseph de P...|                 1|       2|1999-01-01 00:00:00|2100-12-12 00:00:00|760be02221cf1045d...|   true|
+------+--------------------+------------------+--------+-------------------+-------------------+--------------------+-------+


# Read source table

In [5]:

cc.set_connectionProfile("velodb")

df_velo_users = spark.read \
    .format("jdbc") \
    .option("driver", cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "velo_users") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "userid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 20) \
    .load()

df_subscription = spark.read \
    .format("jdbc") \
    .option("driver", cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "subscriptions") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "userid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 20) \
    .load()

df_velo_users.createOrReplaceTempView("df_velo_users")
df_subscription.createOrReplaceTempView("df_subscription")

In [6]:
#b. Transforming the source to the dimension format

df_velo_users = spark.sql("""
select userid as duplicate_userid, street || ' ' || number || ', ' || zipcode || ' ' || city || ' ' || country_code as duplicate_address, monotonically_increasing_id() as duplicate_klant_SK,
    md5(duplicate_address) as duplicate_md5_hash
    from df_velo_users
""")

df_subscription = spark.sql("""
    SELECT s.userid as duplicate_userid, s.subscriptiontypeid as duplicate_subscriptiontypeid, s.validfrom as duplicate_validfrom
    FROM (
        SELECT userid, MAX(validfrom) AS max_validfrom
        FROM df_subscription
        GROUP BY userid
    ) AS r
    JOIN df_subscription s ON s.userid = r.userid AND s.validfrom = r.max_validfrom
""")

joined_df = df_velo_users.join(df_subscription, on='duplicate_userid')

dim_klant_duplicate = joined_df.select('duplicate_userid', 'duplicate_address', 'duplicate_subscriptiontypeid', 'duplicate_klant_SK', 'duplicate_md5_hash')

dim_klant_duplicate.createOrReplaceTempView("dim_klant_duplicate")

spark.sql("select * from dim_klant_duplicate").show()


+----------------+--------------------+----------------------------+------------------+--------------------+
|duplicate_userid|   duplicate_address|duplicate_subscriptiontypeid|duplicate_klant_SK|  duplicate_md5_hash|
+----------------+--------------------+----------------------------+------------------+--------------------+
|               1|Somméstraat 156, ...|                           1|                 3|b56eb6300428b2c2f...|
|               2|Europalaan 43 , 2...|                           2|                 0|3e10060458ebd8bb2...|
|               3|Maria Clarastraat...|                           3|                 1|422db75483b2abfa2...|
|               4|Graaf Joseph de P...|                           1|                 2|760be02221cf1045d...|
|               6|Jan Ockegemstraat...|                           3|        8589934593|3ecc98d1e19c45f83...|
|               9|Bikschotelaan 60 ...|                           2|        8589934596|42ead08d46540b6d9...|
|               7|K

In [7]:
detectedChanges=spark.sql("""
select * from dim_klant_duplicate source
left outer join dim_klant dk on source.duplicate_klant_SK = dk.klant_SK and dk.current == true
where source.duplicate_md5_hash <> dk.md5_hash
""")
detectedChanges.createOrReplaceTempView("detectedChanges")

detectedChanges.show()


+----------------+--------------------+----------------------------+------------------+--------------------+------+--------------------+------------------+--------+-------------------+-------------------+--------------------+-------+
|duplicate_userid|   duplicate_address|duplicate_subscriptiontypeid|duplicate_klant_SK|  duplicate_md5_hash|userid|             address|subscriptiontypeid|klant_SK|          scd_start|            scd_end|            md5_hash|current|
+----------------+--------------------+----------------------------+------------------+--------------------+------+--------------------+------------------+--------+-------------------+-------------------+--------------------+-------+
|               1|Somméstraat 156, ...|                           1|                 3|b56eb6300428b2c2f...|     1|Somméstraat 157, ...|                 1|       3|1999-01-01 00:00:00|2100-12-12 00:00:00|a1a642e3220fed8a5...|   true|
+----------------+--------------------+-------------------------

In [8]:
df_upserts = spark.sql(f"""
select source.duplicate_userid, source.duplicate_address, source.duplicate_subscriptiontypeid, source.duplicate_klant_SK, to_timestamp('{run_timestamp}') as scd_start, to_timestamp('2100-12-12','yyyy-MM-dd') as scd_end, source.duplicate_md5_hash, true as current 
from detectedChanges source
union
select dk.userid, dk.address, dk.subscriptiontypeid, dk.klant_SK, dk.scd_start, to_timestamp('{run_timestamp}') as scd_end, dk.md5_hash, false as current 
from detectedChanges dk
where dk.current is not null
""")

df_upserts.createOrReplaceTempView("upserts")

In [9]:
spark.sql("select * from upserts").show()

+----------------+--------------------+----------------------------+------------------+--------------------+--------------------+--------------------+-------+
|duplicate_userid|   duplicate_address|duplicate_subscriptiontypeid|duplicate_klant_SK|           scd_start|             scd_end|  duplicate_md5_hash|current|
+----------------+--------------------+----------------------------+------------------+--------------------+--------------------+--------------------+-------+
|               1|Somméstraat 156, ...|                           1|                 3|2024-10-28 13:52:...| 2100-12-12 00:00:00|b56eb6300428b2c2f...|   true|
|               1|Somméstraat 157, ...|                           1|                 3| 1999-01-01 00:00:00|2024-10-28 13:52:...|a1a642e3220fed8a5...|  false|
+----------------+--------------------+----------------------------+------------------+--------------------+--------------------+--------------------+-------+


In [10]:
spark.sql("""
MERGE INTO dim_klant  as target
using upserts as source ON target.klant_SK = source.duplicate_klant_SK and source.current = false and target.current=true
WHEN MATCHED THEN UPDATE SET scd_end = source.scd_end, current = source.current
WHEN NOT MATCHED THEN INSERT (klant_SK, userid, address, subscriptiontypeid, scd_start, scd_end, md5_hash, current) VALUES (source.duplicate_klant_SK, source.duplicate_userid, source.duplicate_address, source.duplicate_subscriptiontypeid, source.scd_start, source.scd_end, source.duplicate_md5_hash, source.current)
""")


df.sort("klant_SK", "scd_start").show(10)


+------+--------------------+------------------+----------+--------------------+--------------------+--------------------+-------+
|userid|             address|subscriptiontypeid|  klant_SK|           scd_start|             scd_end|            md5_hash|current|
+------+--------------------+------------------+----------+--------------------+--------------------+--------------------+-------+
|     2|Europalaan 43 , 2...|                 2|         0| 1999-01-01 00:00:00| 2100-12-12 00:00:00|3e10060458ebd8bb2...|   true|
|     3|Maria Clarastraat...|                 3|         1| 1999-01-01 00:00:00| 2100-12-12 00:00:00|422db75483b2abfa2...|   true|
|     4|Graaf Joseph de P...|                 1|         2| 1999-01-01 00:00:00| 2100-12-12 00:00:00|760be02221cf1045d...|   true|
|     1|Somméstraat 157, ...|                 1|         3| 1999-01-01 00:00:00|2024-10-28 13:52:...|a1a642e3220fed8a5...|  false|
|     1|Somméstraat 156, ...|                 1|         3|2024-10-28 13:52:...| 21

In [11]:
# df.sort("klant_SK", "scd_start").show(10)
