In [0]:
#Libraries
from pyspark.sql.window import Window
from pyspark.sql.functions import col, from_json, row_number
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, IntegerType, DateType
from pyspark.sql import DataFrame
from delta.tables import DeltaTable

In [0]:
# Variables
bronze_table = "bronze.training_io.rt_marketing"
silver_table = "silver.training_io.rt_marketing"
checkpoint_path = "/Volumes/silver/checkpoints/realtime"
key_columns = ["id"]
timestamp_column = "kafka_ts"

In [0]:
# Functions
def flat_bronze_df(df: DataFrame) -> DataFrame:
    """
    Parses the 'json_event' column from the bronze table, flattens the nested JSON structure,
    and selects required fields into a flat DataFrame.

    Args:
        df (DataFrame): Input streaming DataFrame from the bronze table.

    Returns:
        DataFrame: Flattened DataFrame with selected fields.
    """
    
    schema = StructType([StructField("id", IntegerType(), True),
                     StructField("datetime", TimestampType(), True),
                     StructField("duration", DoubleType(), True),
                     StructField("title", StringType(), True),
                     StructField("genres", StringType(), True),
                     StructField("release_date", DateType(), True),
                     StructField("movie_id", StringType(), True),
                     StructField("user_id", StringType(), True)])

    df = df.select(
        col("timestamp").alias("kafka_ts"),
        from_json(col("json_event"), schema=schema).alias("parsed_json")
    )

    df = df.select(
        col("kafka_ts"),
        col("parsed_json.id"),
        col("parsed_json.datetime"),
        col("parsed_json.duration"),
        col("parsed_json.title"),
        col("parsed_json.genres"),
        col("parsed_json.release_date"),
        col("parsed_json.movie_id"),
        col("parsed_json.user_id")
    )

    return df

def deduplicate_by_latest_timestamp(df: DataFrame, key_columns: list[str], timestamp_column: str) -> DataFrame:
    """
    Deduplicates the DataFrame by keeping only the latest record per key based on the timestamp.

    Args:
        df (DataFrame): Input DataFrame.
        key_columns (list[str]): List of column names used as the unique key.
        timestamp_column (str): Name of the timestamp column used for ordering.

    Returns:
        DataFrame: Deduplicated DataFrame with only the latest records.
    """

    window_spec = Window.partitionBy(*key_columns).orderBy(col(timestamp_column).desc())
    return df.withColumn("row_num", row_number().over(window_spec)).filter("row_num = 1").drop("row_num")

def upsert(microBatchDF: DataFrame, batchId: int, silver_table: str, key_columns: list[str], timestamp_column: str) -> None:
    """
    Performs an SCD1-style upsert (merge) from a microbatch DataFrame into a Silver Delta table.
    Deduplication is performed before the merge.

    Args:
        microBatchDF (DataFrame): Microbatch DataFrame provided by foreachBatch.
        batchId (int): ID of the microbatch provided by foreachBatch.
        silver_table (str): Name of the target Silver Delta table.
        key_columns (list[str]): List of column names used for joining (keys).
        timestamp_column (str): Name of the timestamp column used to resolve conflicts.
    """

    microBatchDF = deduplicate_by_latest_timestamp(microBatchDF, key_columns, timestamp_column)

    silver_delta_table = DeltaTable.forName(spark, silver_table)

    join_condition = " AND ".join([f"target.{col} = source.{col}" for col in key_columns])
    update_set = {col: f"source.{col}" for col in microBatchDF.columns}

    silver_delta_table.alias("target").merge(
        microBatchDF.alias("source"),
        join_condition
    ).whenMatchedUpdate(
        condition=f"source.{timestamp_column} > target.{timestamp_column}",
        set=update_set
    ).whenNotMatchedInsertAll().execute()

In [0]:
def main():
    # Read the data from the bronze table
    bronze_df = spark.readStream.table(bronze_table)

    # Flat the json data
    bronze_df = flat_bronze_df(bronze_df)

    # Start streaming with foreachBatch for upserts
    bronze_df.writeStream \
        .format("delta") \
        .foreachBatch(lambda df, batchId: upsert(df, batchId, silver_table, key_columns, timestamp_column)) \
        .outputMode("append") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(processingTime="1 minute") \
        .start()

if __name__ == "__main__":
    main()
