In [None]:
# ! pip install delta-spark==2.1.0

In [8]:
import os
import re
from datetime import datetime

from pyspark.sql.types import _parse_datatype_string
from pyspark.sql.functions import input_file_name, monotonically_increasing_id, row_number, regexp_extract, col, concat, sha2, to_timestamp, lit, desc, lag, when, lead,asc,count
from pyspark.sql.window import Window

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [9]:
def create_staging_df_from_raw(raw_path):
    raw_schema_str = "id int,first_name string,last_name string"
    raw_schema = _parse_datatype_string(raw_schema_str)

    raw_df = spark.read.csv(raw_path, header=True, schema=raw_schema)

    timestamp_pattern = r"(\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2})"

    staging_df = raw_df.withColumn("extract_tmst", to_timestamp(regexp_extract(input_file_name(), timestamp_pattern, 1), "yyyy-MM-dd-HH-mm-ss")) \
                       .withColumn("source", input_file_name()) \
                       .withColumn("hash", sha2(concat(*raw_df.columns), 256))

    return staging_df

In [10]:
def scd2_run(raw_path, run_number):

    exec_time = datetime.now()
    exec_time_str = exec_time.strftime("%Y-%m-%d %H:%M:%S")

    high_date = "2999-12-31 23:59:59"

    year, month, day, hour, minute = map(str, exec_time.strftime("%Y %m %d %H %M").split())

    delta_schema_str = (
        "id int, first_name string,last_name string, source string, hash string, is_active string, eff_start timestamp, "
        "eff_end timestamp, create_tmst timestamp, update_tmst timestamp, created_by_exec_id int, updated_by_exec_id int, "
        "year string, month string, day string, hour string, minute string"
    )

    delta_schema = _parse_datatype_string(delta_schema_str)
    delta_path = f"{os.getcwd()}/delta"

    partition_columns = ["year", "month", "day", "hour", "minute"]

    if not DeltaTable.isDeltaTable(spark, delta_path):
        print("Not a delta table. Creating delta table...")
        empty_df = spark.createDataFrame([], delta_schema)

        empty_df.write.format("delta") \
                    .mode("append") \
                    .partitionBy(*partition_columns) \
                    .save(delta_path)
    
    delta_table = DeltaTable.forPath(spark, delta_path)
    delta_df = delta_table.toDF()
    print("Current delta table")
    delta_df.show()


    staging_df = create_staging_df_from_raw(raw_path)


    window_spec = Window.partitionBy("id").orderBy(desc("extract_tmst"), desc(monotonically_increasing_id()))
    staging_df = staging_df.withColumn("next_hash", lag("hash").over(window_spec)) \
                           .withColumn("eff_end_tmst", lead("extract_tmst").over(window_spec))
    staging_df.show()

    staging_df = staging_df.filter((col("hash") != col("next_hash")) | col("next_hash").isNull())
    staging_df.show()


    staging_df = staging_df.withColumn("is_active", when(col("eff_end_tmst").isNull() & col("next_hash").isNull(), lit("Y")).otherwise(lit("N"))) \
                .withColumn("eff_start", col("extract_tmst")) \
                .withColumn("eff_end", when(col("eff_end_tmst").isNull(), to_timestamp(lit(high_date), "yyyy-MM-dd HH:mm:ss")).otherwise(col("eff_end_tmst"))) \
                .withColumn("create_tmst", to_timestamp(lit(exec_time_str), "yyyy-MM-dd HH:mm:ss")) \
                .withColumn("update_tmst", to_timestamp(lit(exec_time_str), "yyyy-MM-dd HH:mm:ss")) \
                .withColumn("created_by_exec_id", lit(run_number)) \
                .withColumn("updated_by_exec_id", lit(run_number)) \
                .withColumn("year", lit(year)) \
                .withColumn("month", lit(month)) \
                .withColumn("day", lit(day)) \
                .withColumn("hour", lit(hour)) \
                .withColumn("minute", lit(minute)) \
                .drop("extract_tmst") \
                .drop("next_hash") \
                .drop("eff_end_tmst")

    print("Current staging table")
    staging_df.show()

    staging_df = staging_df.alias("stage")
    delta_df = delta_df.alias("target")

    # cond = [col("stage.id") == col("target.id"), col("stage.hash") != col("target.hash"), col("stage.is_active") == "Y", col("target.is_active") == "Y"]
    cond = [col("stage.id") == col("target.id"), col("target.is_active") == "Y"]

    filtered_target_df = delta_df.join(staging_df, cond, how="leftsemi")
    print("Filtered target records")
    filtered_target_df.show()

    union_df = staging_df.unionByName(filtered_target_df).select(*filtered_target_df.columns)
    print("Unioned staging & target records")
    union_df.show()

    window_spec = Window.partitionBy("hash").orderBy(desc("is_active"), asc("create_tmst"))
    ordered_union_df = union_df.withColumn("row_number", row_number().over(window_spec)) \
                            #    .withColumn("count", count("*").over(window_spec))

    ordered_union_df.show()

    ordered_union_df = ordered_union_df.withColumn("update_tmst", when(col("row_number") == 1, to_timestamp(lit(exec_time_str), "yyyy-MM-dd HH:mm:ss")).otherwise(col("update_tmst"))) \
                                       .withColumn("updated_by_exec_id", when(col("row_number") == 1, lit(run_number)).otherwise(col("updated_by_exec_id"))) \
                                       .withColumn("is_active", when(col("create_tmst") != col("update_tmst"),lit("N")).otherwise(col("is_active"))) \
                                       .drop("row_number") \
                                       .dropDuplicates(["id", "hash"])
                                    #    .withColumn("is_active", when(col("count") > 1,lit("N")).otherwise(col("is_active"))) \
                                    #    .drop("count") \


    print("Ordered unioned records")
    ordered_union_df.show()

    condition = "target.id==updates.id AND target.hash==updates.hash"

    delta_table.alias("target").merge(
            ordered_union_df.alias("updates"),
            condition
        ).whenMatchedUpdateAll(
            # condition="target.is_active!=updates.is_active"
            condition="target.eff_start==updates.eff_start AND target.is_active=='Y'"
        ).whenNotMatchedInsertAll(
        ).execute()

    delta_table_df = delta_table.toDF()
    print("Updated delta table")
    delta_table_df.show()

In [11]:
scd2_run("./data/delta_load/2023-07-01-00-58-00.csv", 1)

Not a delta table. Creating delta table...
Current delta table
+---+----------+---------+------+----+---------+---------+-------+-----------+-----------+------------------+------------------+----+-----+---+----+------+
| id|first_name|last_name|source|hash|is_active|eff_start|eff_end|create_tmst|update_tmst|created_by_exec_id|updated_by_exec_id|year|month|day|hour|minute|
+---+----------+---------+------+----+---------+---------+-------+-----------+-----------+------------------+------------------+----+-----+---+----+------+
+---+----------+---------+------+----+---------+---------+-------+-----------+-----------+------------------+------------------+----+-----+---+----+------+

+---+----------+----------+-------------------+--------------------+--------------------+---------+------------+
| id|first_name| last_name|       extract_tmst|              source|                hash|next_hash|eff_end_tmst|
+---+----------+----------+-------------------+--------------------+------------------

In [12]:
scd2_run("./data/delta_load/2023-07-02-00-58-00.csv", 2)

Current delta table
+---+----------+----------+--------------------+--------------------+---------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+----+-----+---+----+------+
| id|first_name| last_name|              source|                hash|is_active|          eff_start|            eff_end|        create_tmst|        update_tmst|created_by_exec_id|updated_by_exec_id|year|month|day|hour|minute|
+---+----------+----------+--------------------+--------------------+---------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+----+-----+---+----+------+
|  1|   Belinda|  Sullivan|file:///Users/sai...|1863cefdfa2cde755...|        Y|2023-07-01 00:58:00|2999-12-31 23:59:59|2023-07-19 14:02:06|2023-07-19 14:02:06|                 1|                 1|2023|   07| 19|  14|    02|
|  2|      Lexi|     Walls|file:///Users/sai...|83b9e894466f70135...|        Y|2

In [13]:
scd2_run("./data/delta_load/2023-07-03-00-58-00.csv", 3)

Current delta table
+---+----------+----------+--------------------+--------------------+---------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+----+-----+---+----+------+
| id|first_name| last_name|              source|                hash|is_active|          eff_start|            eff_end|        create_tmst|        update_tmst|created_by_exec_id|updated_by_exec_id|year|month|day|hour|minute|
+---+----------+----------+--------------------+--------------------+---------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+----+-----+---+----+------+
|  1|   Belinda|  Sullivan|file:///Users/sai...|1863cefdfa2cde755...|        Y|2023-07-01 00:58:00|2999-12-31 23:59:59|2023-07-19 14:02:06|2023-07-19 14:02:06|                 1|                 1|2023|   07| 19|  14|    02|
|  2|      Lexi|     Walls|file:///Users/sai...|83b9e894466f70135...|        Y|2

In [7]:
scd2_run("./data/delta_load/2023-07-04-00-58-00.csv", 4)

Current delta table
+---+----------+----------+--------------------+--------------------+---------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+----+-----+---+----+------+
| id|first_name| last_name|              source|                hash|is_active|          eff_start|            eff_end|        create_tmst|        update_tmst|created_by_exec_id|updated_by_exec_id|year|month|day|hour|minute|
+---+----------+----------+--------------------+--------------------+---------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+----+-----+---+----+------+
|  1|   Belinda|  Sullivan|file:///Users/sai...|1863cefdfa2cde755...|        N|2023-07-01 00:58:00|2999-12-31 23:59:59|2023-07-19 13:49:23|2023-07-19 13:49:43|                 1|                 3|2023|   07| 19|  13|    49|
|  1|   Belinda|    Waters|file:///Users/sai...|5877452deb0c15143...|        Y|2

In [8]:
scd2_run("./data/delta_load/2023-07-05-00-58-00.csv", 5)

+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
| id|first_name|last_name|       extract_tmst|              source|                hash|           next_hash|       eff_end_tmst|
+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
|  7|   Rodrigo|    Cline|2023-07-05 00:58:00|file:///Users/sai...|9dc9e42700e242c08...|a607afbdf3766b3ea...|2023-07-05 00:58:00|
|  7|   Rodrigo| McKinney|2023-07-05 00:58:00|file:///Users/sai...|a607afbdf3766b3ea...|e2b52da27f5fb6e75...|2023-07-05 00:58:00|
|  7|   Rodrigo|  Solomon|2023-07-05 00:58:00|file:///Users/sai...|e2b52da27f5fb6e75...|                null|               null|
+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+

+---+----------+---------+-------------------+--------------------+--------------------+-

In [9]:
scd2_run("./data/delta_load/2023-07-06-00-58-00.csv", 6)

+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
| id|first_name|last_name|       extract_tmst|              source|                hash|           next_hash|       eff_end_tmst|
+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
|  7|   Rodrigo|  Carillo|2023-07-06 00:58:00|file:///Users/sai...|238173817f6ca8671...|238173817f6ca8671...|2023-07-06 00:58:00|
|  7|   Rodrigo|  Carillo|2023-07-06 00:58:00|file:///Users/sai...|238173817f6ca8671...|                null|               null|
+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+

+---+----------+---------+-------------------+--------------------+--------------------+---------+------------+
| id|first_name|last_name|       extract_tmst|              source|                hash|next_hash|eff_end_t

In [10]:
scd2_run("./data/delta_load/2023-07-07-00-58-00.csv", 7)

+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
| id|first_name|last_name|       extract_tmst|              source|                hash|           next_hash|       eff_end_tmst|
+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
|  7|   Rodrigo|  Carillo|2023-07-07 00:58:00|file:///Users/sai...|238173817f6ca8671...|238173817f6ca8671...|2023-07-07 00:58:00|
|  7|   Rodrigo|  Carillo|2023-07-07 00:58:00|file:///Users/sai...|238173817f6ca8671...|238173817f6ca8671...|2023-07-07 00:58:00|
|  7|   Rodrigo|  Carillo|2023-07-07 00:58:00|file:///Users/sai...|238173817f6ca8671...|                null|               null|
+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+

+---+----------+---------+-------------------+--------------------+--------------------+-

In [53]:
scd2_run("./data/delta_load/2023-07-08-00-58-00.csv", 8)

+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
| id|first_name|last_name|       extract_tmst|              source|                hash|           next_hash|       eff_end_tmst|
+---+----------+---------+-------------------+--------------------+--------------------+--------------------+-------------------+
|  7|   Rodrigo|  Carillo|2023-07-08 00:58:00|file:///Users/sai...|238173817f6ca8671...|238173817f6ca8671...|2023-07-08 00:58:00|
|  7|   Rodrigo|  Carillo|2023-07-08 00:58:00|file:///Users/sai...|238173817f6ca8671...|238173817f6ca8671...|2023-07-08 00:58:00|
|  7|   Rodrigo|  Carillo|2023-07-08 00:58:00|file:///Users/sai...|238173817f6ca8671...|aa442b27a0ebe3ce5...|2023-07-08 00:58:00|
|  7|   Rodrigo| November|2023-07-08 00:58:00|file:///Users/sai...|aa442b27a0ebe3ce5...|c747318a07f20ce1d...|2023-07-08 00:58:00|
|  7|   Rodrigo| December|2023-07-08 00:58:00|file:///Users/sai...|c747318a07f20ce1d...|23

In [71]:
"""
Get the required columns from the updated df. Union them with the staging df

merge union df into delta
condition source and target id and hash match
when match update condition source.is_active != target.is_active
when does not match insert
"""

'\nGet the required columns from the updated df. Union them with the staging df\n\nmerge union df into delta\ncondition source and target id and hash match\nwhen match update condition source.is_active != target.is_active\nwhen does not match insert\n'

In [None]:
""""
1. What if the current active record in target is same as the most recent staging record but there are other older records with the same id
""""