# INTRODUCTION

In [1]:

# Import SparkSession
import pyspark
from delta import configure_spark_with_delta_pip

builder = pyspark.sql.SparkSession.builder.appName("STREAMING_DWH") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).enableHiveSupport().getOrCreate()


In [2]:
# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
df = spark \
    .readStream \
    .format("json") \
    .load("data/product/")

df.printSchema()

root
 |-- category: string (nullable = true)
 |-- cogs: double (nullable = true)
 |-- contains_caffeine: boolean (nullable = true)
 |-- contains_fruit: boolean (nullable = true)
 |-- contains_nuts: boolean (nullable = true)
 |-- contains_veggies: boolean (nullable = true)
 |-- event_time: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price: double (nullable = true)
 |-- product_id: string (nullable = true)
 |-- size: string (nullable = true)



In [3]:
from pyspark.sql.functions import current_timestamp, input_file_name

def create_bronze_streaming_table(source, target):

    # Generates a source path based on table name, reads all files from that and inserts into bronze schema
    query = (
        spark.readStream
        .format("json")
        .load(source)
        .withColumn("meta_ingestion_ts", current_timestamp())
        .withColumn("meta_filename", input_file_name())
        .writeStream
        .outputMode("append")
        .format("delta")
        .option("checkpointLocation", f"spark-warehouse/_checkpoints/{target}")
        .toTable(target)
    )
    return query

query1 = create_bronze_streaming_table(source="data/inventory", target="bronze_inventory")
query2 = create_bronze_streaming_table(source="data/product", target="bronze_product")
query3 = create_bronze_streaming_table(source="data/purchase", target="bronze_purchase")

# Use the code 
# spark.streams.awaitAnyTermination()


In [4]:
spark.sql("SELECT * FROM bronze_product").show(5)

+--------------------+----+-----------------+--------------+-------------+----------------+--------------------+------------------+-----+----------+------+--------------------+--------------------+
|            category|cogs|contains_caffeine|contains_fruit|contains_nuts|contains_veggies|          event_time|              item|price|product_id|  size|   meta_ingestion_ts|       meta_filename|
+--------------------+----+-----------------+--------------+-------------+----------------+--------------------+------------------+-----+----------+------+--------------------+--------------------+
|Supercharged Smoo...| 2.7|            false|          true|        false|           false|2024-02-21 19:24:...|  Triple Berry Oat| 5.99|      SC01|24 oz.|2024-02-21 19:58:...|file:///home/pete...|
|Supercharged Smoo...| 2.7|            false|         false|        false|           false|2024-02-21 19:24:...|   Peanut Paradise| 5.99|      SC02|24 oz.|2024-02-21 19:58:...|file:///home/pete...|
|   Classi

# SILVER TABLES: SLOWLY CHANGING DIMENSIONS (SCD) - TYPE 1 & 2

In [5]:
from pyspark.sql.functions import md5, concat_ws, lit, row_number, monotonically_increasing_id
from pyspark.sql.types import BooleanType, TimestampType
from pyspark.sql.window import Window
from utils import reorder_columns_in_dataframe

def create_silver_scd1_table(
    source : str, 
    target : str,
    timestamp_key : str,
    merge_key: str,
    surrogate_key : str,
    delta_load_column: str = None
):
    
    # Perform delta load or not?
    if delta_load_column:
        df = spark.sql(f"select * from {source} where {delta_load_column} > (select max({delta_load_column}) from {target})")
        offset = spark.sql(f"select max({surrogate_key}) as offset from {target}").collect()[0].offset + 1
    else:
        spark.sql(f"drop table if exists {target}")
        df = spark.sql(f"select * from {source}")

    # Calculate hashdiff string based on all columns that doesn't contain "meta_" in the name
    df = df.withColumn("meta_hashdiff", md5(concat_ws("||", *[c for c in df.columns if "meta_" not in c])))

    # Set default values for meta_last_updated
    df = df.withColumn("meta_last_updated", current_timestamp())

    # Generate surrogate key
    df = df.withColumn(surrogate_key, monotonically_increasing_id() + offset)

    # Calculate sequence numbers if source data contain multiple rows for each merge_key
    window_spec = Window.partitionBy(merge_key).orderBy(timestamp_key)
    df = df.withColumn("meta_sequence", row_number().over(window_spec))

    # Reorder columns
    df = reorder_columns_in_dataframe(df=df, 
                                      columns_to_front=[surrogate_key],
                                      columns_to_back=[c for c in df.columns if "meta_" in c],
                                      columns_to_delete=["meta_filename"])
    
    # Create an empty Delta table with the same schema
    tmp_view_name = "temporaryView"
    df.createOrReplaceTempView(tmp_view_name)
    spark.sql(f"CREATE TABLE IF NOT EXISTS {target} LIKE {tmp_view_name} USING DELTA")

    # Get list of sequences
    lst_sequence = sorted([p.meta_sequence for p in df.select('meta_sequence').distinct().collect()])

    # Run SCD1 table
    for seq_num in lst_sequence:
        print(f"Inserting into SILVER SCD TYPE 1 TABLE: {target}")
        merge_query = f"""
            MERGE INTO {target} AS target
            USING (
                SELECT * FROM {tmp_view_name}
                WHERE meta_sequence = {seq_num}
            ) AS source ON target.{surrogate_key} = source.{surrogate_key}
            WHEN MATCHED AND target.meta_hashdiff <> source.meta_hashdiff
                THEN UPDATE SET *
            WHEN NOT MATCHED 
                THEN INSERT *
        """
        spark.sql(merge_query).show()


In [6]:

create_silver_scd1_table (
    source="bronze_purchase",
    target="silver_purchase_scd1",
    timestamp_key="transaction_time",
    merge_key="transaction_id",
    surrogate_key="transaction_sid",
)

create_silver_scd1_table (
    source="bronze_inventory",
    target="silver_inventory_scd1",
    timestamp_key="event_time",
    merge_key="event_time",
    surrogate_key="inventory_sid",
)

Inserting into SILVER SCD TYPE 1 TABLE: silver_purchase_scd1
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|              120|               0|               0|              120|
+-----------------+----------------+----------------+-----------------+

Inserting into SILVER SCD TYPE 1 TABLE: silver_purchase_scd1
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|               41|               0|               0|               41|
+-----------------+----------------+----------------+-----------------+

Inserting into SILVER SCD TYPE 1 TABLE: silver_purchase_scd1
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_

In [20]:
def create_silver_scd2_table(
    source: str, 
    target: str, 
    merge_key: str, 
    timestamp_key: str, 
    surrogate_key : str,
    delta_load_column: str = None
):
    
    # Perform delta load or not?
    if delta_load_column:
        df = spark.sql(f"select * from {source} where {delta_load_column} > (select max({delta_load_column}) from {target})")
        offset = spark.sql(f"select max({surrogate_key}) as offset from {target}").collect()[0].offset + 1
    else:
        spark.sql(f"drop table if exists {target}")
        df = spark.sql(f"select * from {source}")

    # Calculate hashdiff string based on all columns that doesn't contain "meta_" in the name
    df = df.withColumn("meta_hashdiff", md5(concat_ws("||", *[c for c in df.columns if "meta_" not in c])))

    # Set default values for meta columns
    df = df.withColumn("meta_is_current", lit(1).cast(BooleanType()))
    df = df.withColumn("meta_valid_from", df[timestamp_key])
    df = df.withColumn("meta_valid_to", lit('9999-12-31').cast(TimestampType()))

    # Calculate surrogate key
    df = df.withColumn(surrogate_key, monotonically_increasing_id() + offset)

    # Calculate sequence numbers if source data contain multiple rows for each merge_key
    window_spec = Window.partitionBy(merge_key).orderBy(timestamp_key)
    df = df.withColumn("meta_sequence", row_number().over(window_spec))

    # Reorder columns in dataframe
    df = reorder_columns_in_dataframe(
        df=df, 
        columns_to_front=[surrogate_key, merge_key],
        columns_to_back=[c for c in df.columns if "meta_" in c],
        columns_to_delete=["meta_filename"]
    )

    # Create an empty Delta table with the same schema
    tmp_view_name = "temporaryView"
    df.createOrReplaceTempView(tmp_view_name)
    spark.sql(f"CREATE TABLE IF NOT EXISTS {target} LIKE {tmp_view_name} USING DELTA")

    # Get list of sequences
    lst_sequence = sorted([p.meta_sequence for p in df.select('meta_sequence').distinct().collect()])

    # Run SCD2 table 
    for seq_num in lst_sequence:
        print(f"Inserting into SILVER SCD TYPE 2 TABLE: {target}")
        merge_query = f"""
            MERGE INTO {target} AS target
            USING (
                SELECT * FROM {tmp_view_name}
                WHERE meta_sequence = {seq_num}
            ) AS source ON target.{merge_key} = source.{merge_key}
            WHEN MATCHED AND target.meta_is_current = true AND target.meta_hashdiff <> source.meta_hashdiff
                THEN UPDATE SET meta_is_current = false, meta_valid_to = source.{timestamp_key}
            WHEN NOT MATCHED 
                THEN INSERT *
        """
        spark.sql(merge_query).show()

        insert_query = f"""
            INSERT INTO {target}
            SELECT * FROM 
            (
                SELECT source.* 
                FROM {tmp_view_name} source
                JOIN {target} target ON target.{merge_key} = source.{merge_key}
                WHERE source.meta_sequence = {seq_num}
                AND target.meta_hashdiff <> source.meta_hashdiff 
            )
        """
        spark.sql(insert_query)

In [22]:
# Create SCD2 tables
create_silver_scd2_table(
    source = "bronze_product",
    target = "silver_product_scd2",
    merge_key = "product_id",
    timestamp_key = "event_time",
    surrogate_key = "product_sid"
)

In [9]:
spark.sql("select * from silver_purchase_scd1").show(5)
spark.sql("select * from silver_inventory_scd1").show(5)
spark.sql("select * from silver_product_scd2 order by product_sid").show(5)

+---------------+---------------+---------+---------------+-----+----------+--------+----------------+--------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|transaction_sid|add_supplements|is_member|member_discount|price|product_id|quantity|supplement_price|total_purchase|     transaction_id|    transaction_time|   meta_ingestion_ts|       meta_hashdiff|   meta_last_updated|meta_sequence|
+---------------+---------------+---------+---------------+-----+----------+--------+----------------+--------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|    60129542148|          false|    false|            0.0| 5.99|      SF03|       1|             0.0|          5.99|1536494145524224281|2024-02-17 19:32:...|2024-02-21 19:58:...|6328ec5fc47ea0ba6...|2024-02-21 20:08:...|            3|
|    51539607557|          false|     true|            0

In [10]:

# def create_gold_dimension_table():


In [11]:
from utils import generate_dim_table_references

def create_gold_fact_table(
    source : str, 
    target : str,
    surrogate_key : str,
    timestamp_key : str,
    dim_table_refs : dict,
    delta_load_column: str = None
):

    # Generate and run SQL query
    df = spark.sql(generate_dim_table_references(source=source,
                                                 target=target,
                                                 timestamp_key=timestamp_key, 
                                                 dim_table_refs=dim_table_refs, 
                                                 delta_load_column=delta_load_column))

    # Reorder columns in dataframe
    df = reorder_columns_in_dataframe(
        df=df, 
        columns_to_front=[surrogate_key] + [r["surrogate_key"] for r in dim_table_refs],
        columns_to_back=[c for c in df.columns if "meta_" in c]
    )

    # Create an empty Delta table with the same schema
    tmp_view_name = "temporaryView"
    df.createOrReplaceTempView(tmp_view_name)
    spark.sql(f"CREATE TABLE IF NOT EXISTS {target} LIKE {tmp_view_name} USING DELTA")

    # Merge into target table 
    merge_query = f"""
        MERGE INTO {target} AS target
        USING {tmp_view_name} AS source ON target.{surrogate_key} = source.{surrogate_key}
        WHEN MATCHED AND target.meta_hashdiff <> source.meta_hashdiff THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """
    spark.sql(merge_query).show()
    

In [12]:
create_gold_fact_table (
    source="silver_purchase_scd1",
    target="gold_fact_purchase",
    surrogate_key="transaction_sid",
    timestamp_key="transaction_time",
    dim_table_refs=[{"table_name": "silver_product_scd2", "merge_key": "product_id", "surrogate_key": "product_sid"}]
)

create_gold_fact_table (
    source="silver_inventory_scd1",
    target="gold_fact_inventory",
    surrogate_key="inventory_sid",
    timestamp_key="event_time",
    dim_table_refs=[{"table_name": "silver_product_scd2", "merge_key": "product_id", "surrogate_key": "product_sid"}]
)


SELECT src.*, silver_product_scd2.product_sid 
FROM silver_purchase_scd1 src
LEFT JOIN silver_product_scd2 ON silver_product_scd2.product_id = src.product_id
        AND src.transaction_time BETWEEN silver_product_scd2.meta_valid_from AND silver_product_scd2.meta_valid_to
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|              292|               0|               0|              292|
+-----------------+----------------+----------------+-----------------+

SELECT src.*, silver_product_scd2.product_sid 
FROM silver_inventory_scd1 src
LEFT JOIN silver_product_scd2 ON silver_product_scd2.product_id = src.product_id
        AND src.event_time BETWEEN silver_product_scd2.meta_valid_from AND silver_product_scd2.meta_valid_to
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_upd

In [13]:
# spark.sql("select * from gold_fact_purchase").show(5)
spark.sql("select * from gold_fact_inventory").show(5)

+-------------+-----------+--------------------+--------------+---------+----------+--------------+--------------------+--------------------+--------------------+-------------+
|inventory_sid|product_sid|          event_time|existing_level|new_level|product_id|stock_quantity|   meta_ingestion_ts|       meta_hashdiff|   meta_last_updated|meta_sequence|
+-------------+-----------+--------------------+--------------+---------+----------+--------------+--------------------+--------------------+--------------------+-------------+
|  42949672960|42949672965|2024-02-17 19:27:...|            34|       44|      SC01|            10|2024-02-21 19:58:...|b193b27f72a8685a4...|2024-02-21 20:08:...|            1|
|  42949672961| 8589934594|2024-02-17 19:27:...|            49|       59|      SF06|            10|2024-02-21 19:58:...|af9f74edf7247d629...|2024-02-21 20:08:...|            1|
|  42949672962|42949672964|2024-02-17 19:27:...|            48|       58|      SF04|            10|2024-02-21 19:58

# PUTTING IT ALL TOGETHER

In [14]:



while True:

    # BRONZE
    query1 = create_bronze_streaming_table(source="data/inventory", target="bronze_inventory")
    query2 = create_bronze_streaming_table(source="data/product", target="bronze_product")
    query3 = create_bronze_streaming_table(source="data/purchase", target="bronze_purchase")

    # SILVER
    create_silver_scd1_table (
        source="bronze_purchase",
        target="silver_purchase_scd1",
        timestamp_key="transaction_time",
        merge_key="transaction_id",
        surrogate_key="transaction_sid",
        delta_load_column="transaction_time"
    )
    create_silver_scd1_table (
        source="bronze_inventory",
        target="silver_inventory_scd1",
        timestamp_key="event_time",
        merge_key="event_time",
        surrogate_key="inventory_sid",
        delta_load_column="event_time"
    )
    create_silver_scd2_table(
        source = "bronze_product",
        target = "silver_product_scd2",
        merge_key = "product_id",
        timestamp_key = "event_time",
        surrogate_key = "product_sid",
        delta_load_column="event_time"
    )

    # GOLD
    create_gold_fact_table (
        source="silver_purchase_scd1",
        target="gold_fact_purchase",
        surrogate_key="transaction_sid",
        timestamp_key="transaction_time",
        dim_table_refs=[{"table_name": "silver_product_scd2", "merge_key": "product_id", "surrogate_key": "product_sid"}]
    )

    create_gold_fact_table (
        source="silver_inventory_scd1",
        target="gold_fact_inventory",
        surrogate_key="inventory_sid",
        timestamp_key="event_time",
        dim_table_refs=[{"table_name": "silver_product_scd2", "merge_key": "product_id", "surrogate_key": "product_sid"}]
    )

SELECT src.*, silver_product_scd2.product_sid 
FROM silver_purchase_scd1 src
LEFT JOIN silver_product_scd2 ON silver_product_scd2.product_id = src.product_id
        AND src.transaction_time BETWEEN silver_product_scd2.meta_valid_from AND silver_product_scd2.meta_valid_to


UnsupportedOperationException: [DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches. Please refer to
https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge