In [0]:
#from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, to_timestamp, lit, current_timestamp, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, LongType
import time
from datetime import datetime, timedelta

# Stream Generation for Order-Payment Processing

## Data Stream Creation
This code creates two simulated data streams in Spark:

1. **Orders Stream** - Generates purchase order events at a rate of 2 per second
2. **Payments Stream** - Generates payment events that correspond to orders

## Stream Characteristics


### Payments Stream
- Creates payments that mostly match with orders for testing stream joins
- Includes deliberate test cases:
  - **Normal case**: Standard payments with matching order IDs (default)
  - **Missing order**: Payments referencing non-existent orders.
  - **Early payments**: Payments that arrive before their corresponding orders.
  - **Late orders**: Payments for orders that are too old.
  - **Timing variations**:
    - Payments with timestamps 5 minutes in the past (testing watermark limits)
    - Payments with timestamps 8 minutes in the future (testing join window boundaries)

This setup allows testing of stream join behavior, late data handling, and watermark functionality in a controlled environment.

In [0]:
orders_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .option("numPartitions", 1) \
    .load() \
    .withColumn("order_id", expr("CONCAT('ord-', CAST(value AS STRING))")) \
    .withColumn("customer_id", expr("CONCAT('cust-', CAST(value % 5 AS STRING))")) \
    .withColumn("amount", expr("RAND() * 100")) \
    .withColumn("order_time", col("timestamp"))


payments_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .option("numPartitions", 1) \
    .load() \
    .withColumn("payment_id", expr("CONCAT('pmt-', CAST(value AS STRING))")) \
    .withColumn("order_id", expr("CASE " +
                               "WHEN value % 10 = 0 THEN CONCAT('missing-', CAST(value AS STRING)) " + # No matching order
                               "WHEN value % 10 = 1 THEN CONCAT('ord-', CAST(value + 5 AS STRING)) " + # Will arrive before order
                               "WHEN value % 10 = 2 THEN CONCAT('ord-', CAST(value - 100 AS STRING)) " + # Order too old
                               "ELSE CONCAT('ord-', CAST(value AS STRING)) " + # Normal case - should join
                               "END")) \
    .withColumn("payment_amount", expr("RAND() * 100")) \
    .withColumn("payment_time", expr("CASE " +
                                  "WHEN value % 10 = 3 THEN timestamp - INTERVAL 5 MINUTES " + # Payment too old (beyond watermark)
                                  "WHEN value % 10 = 4 THEN timestamp + INTERVAL 8 MINUTES " + # Payment too far in future (beyond join window)
                                  "ELSE timestamp " + # Normal timestamp
                                  "END"))

# Adding Watermarks to Streams

## Watermark Configuration
Watermarks are essential for handling late data in streaming applications. They define how long to wait for late-arriving data before proceeding with operations like joins and aggregations.

Apply a 2-minute watermark to the orders stream
This means data arriving more than 2 minutes late (compared to the max event time seen) 
will be dropped from stateful operations

In [0]:
orders_with_watermark = orders_stream \
    .withWatermark("order_time", "2 minutes")

payments_with_watermark = payments_stream \
    .withWatermark("payment_time", "2 minutes")

# Stream-to-Stream Join with Time Constraints

## Joining Order and Payment Streams
This code performs a windowed join between the orders and payments streams, ensuring that matching payments occur within a specific time window relative to the order.


In [0]:
joined = orders_with_watermark.alias('order') \
    .join(
        payments_with_watermark.alias('payment'),
        expr("""
            order.order_id = payment.order_id AND
            order_time >= payment_time - INTERVAL 5 MINUTES AND
            order_time <= payment_time + INTERVAL 5 MINUTES
        """),
        "inner"
    ).select('order.*', 'payment.payment_id', 'payment.payment_amount', 'payment.payment_time')

In [0]:
checkpoint_location = "/tmp/delta/order_payment_join_2"

# Writing the Joined Stream to Delta Table

## Output Configuration
This code defines how the joined stream of orders and payments should be persisted to a Delta table.


In [0]:
query = joined.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_location) \
    .queryName("OrderPaymentJoin") \
    .toTable("gp_uc.test.order_payment_join_2")

In [0]:
%sql
select * from gp_uc.test.order_payment_join_2

timestamp,value,order_id,customer_id,amount,order_time,payment_id,payment_amount,payment_time
2025-03-30T19:31:29.751Z,13,ord-13,cust-3,72.2412291047328,2025-03-30T19:31:29.751Z,pmt-13,25.92107611670553,2025-03-30T19:26:29.857Z
2025-03-30T19:31:44.751Z,43,ord-43,cust-3,49.69037984449498,2025-03-30T19:31:44.751Z,pmt-43,61.35093437737906,2025-03-30T19:26:44.857Z
2025-03-30T19:31:40.751Z,35,ord-35,cust-0,58.27488715776664,2025-03-30T19:31:40.751Z,pmt-35,98.6745614695326,2025-03-30T19:31:40.857Z
2025-03-30T19:31:39.751Z,33,ord-33,cust-3,80.04691069263505,2025-03-30T19:31:39.751Z,pmt-33,52.04854864392876,2025-03-30T19:26:39.857Z
2025-03-30T19:31:27.751Z,9,ord-9,cust-4,27.06371403674469,2025-03-30T19:31:27.751Z,pmt-9,96.64248903914266,2025-03-30T19:31:27.857Z
2025-03-30T19:31:56.751Z,67,ord-67,cust-2,14.56859029482558,2025-03-30T19:31:56.751Z,pmt-67,12.967977214636829,2025-03-30T19:31:56.857Z
2025-03-30T19:31:25.751Z,5,ord-5,cust-0,36.19474392294052,2025-03-30T19:31:25.751Z,pmt-5,39.41450915234833,2025-03-30T19:31:25.857Z
2025-03-30T19:31:34.751Z,23,ord-23,cust-3,88.93010673172357,2025-03-30T19:31:34.751Z,pmt-23,78.79975294625486,2025-03-30T19:26:34.857Z
2025-03-30T19:31:46.751Z,47,ord-47,cust-2,31.61849556210388,2025-03-30T19:31:46.751Z,pmt-47,33.66132854839938,2025-03-30T19:31:46.857Z
2025-03-30T19:31:26.251Z,6,ord-6,cust-1,40.32235840068999,2025-03-30T19:31:26.251Z,pmt-1,69.32067292373202,2025-03-30T19:31:23.857Z


# Exlporing using stateStore

## Monitoring Stream State Metadata

In [0]:
display(spark.read
  .format("state-metadata")
  .load(checkpoint_location))

operatorId,operatorName,stateStoreName,numPartitions,minBatchId,maxBatchId
0,symmetricHashJoin,left-keyToNumValues,200,1353,1453
0,symmetricHashJoin,left-keyWithIndexToValue,200,1353,1453
0,symmetricHashJoin,right-keyToNumValues,200,1353,1453
0,symmetricHashJoin,right-keyWithIndexToValue,200,1353,1453


# Inspecting State Store Contents

https://docs.databricks.com/aws/en/structured-streaming/read-state

In [0]:
# Options joinSide or storename mandatory when accessing state store for joined streams
# storename can be retrieved from the state metadata table
# joinSide can be "left" or "right"

display(spark.read
  .format("statestore")
  .option("storename", "left-keyWithIndexToValue")
  .option("batchId", 1453)
  .load(checkpoint_location))

key,value,partition_id
"List(ord-270187, 0)","List(2025-04-01T09:02:56.751Z, 270187, ord-270187, cust-2, 37.02038446954339, 2025-04-01T09:02:56.751Z, true)",0
"List(ord-265875, 0)","List(2025-04-01T08:27:00.751Z, 265875, ord-265875, cust-0, 19.22864385846963, 2025-04-01T08:27:00.751Z, true)",0
"List(ord-290779, 0)","List(2025-04-01T11:54:32.751Z, 290779, ord-290779, cust-4, 0.9733775345604267, 2025-04-01T11:54:32.751Z, true)",0
"List(ord-256113, 0)","List(2025-04-01T07:05:39.751Z, 256113, ord-256113, cust-3, 86.36945494397014, 2025-04-01T07:05:39.751Z, false)",0
"List(ord-293940, 0)","List(2025-04-01T12:20:53.251Z, 293940, ord-293940, cust-0, 54.919112454586305, 2025-04-01T12:20:53.251Z, false)",0
"List(ord-294105, 0)","List(2025-04-01T12:22:15.751Z, 294105, ord-294105, cust-0, 83.72634418471678, 2025-04-01T12:22:15.751Z, true)",0
"List(ord-286715, 0)","List(2025-04-01T11:20:40.751Z, 286715, ord-286715, cust-0, 17.408156577104883, 2025-04-01T11:20:40.751Z, true)",0
"List(ord-267679, 0)","List(2025-04-01T08:42:02.751Z, 267679, ord-267679, cust-4, 18.395062914743587, 2025-04-01T08:42:02.751Z, true)",0
"List(ord-288734, 0)","List(2025-04-01T11:37:30.251Z, 288734, ord-288734, cust-4, 98.23984358519809, 2025-04-01T11:37:30.251Z, false)",0
"List(ord-285537, 0)","List(2025-04-01T11:10:51.751Z, 285537, ord-285537, cust-2, 8.215907285328928, 2025-04-01T11:10:51.751Z, true)",0


In [0]:
display(spark.read
  .format("statestore")
  .option("storename", "right-keyWithIndexToValue")
  .option("batchId", 70)
  .load(checkpoint_location))

# Analyzing Left-Side Join State

## Retrieving Order Stream State
This code extracts and loads the state information for the left side (orders) of our stream-to-stream join operation.


In [0]:
# Load state information for the orders (left) side of the stream join
df_left = spark.read \
  .format("statestore") \
  .option("storename", "left-keyWithIndexToValue") \
  .option("batchId", 70) \
  .load(checkpoint_location) 

In [0]:
# Keys that didn't match in the right table either due to a watermark or a join window or a mismatch
display(df_left.filter("value.matched = false"))

Analyzing Right-Side Join State
Retrieving Payment Stream State
This code extracts and loads the state information for the right side (payments) of our stream-to-stream join operation.

In [0]:
# Load state information for the payments (right) side of the stream join
df_right = spark.read \
  .format("statestore") \
  .option("storename", "right-keyWithIndexToValue") \
  .option("batchId", 70) \
  .load(checkpoint_location) 

In [0]:
# distinct keys in the left table
display(df_left.select('key.field0').distinct().count())

# Detecting Key Distribution Skew

## Analyzing Key Distribution
This code analyzes the distribution of keys in the left state store (orders) to identify potential skew that could affect join performance.

In [0]:
# Count occurrences of each order_id to identify potential data skew
display(df_left.groupBy('key.field0').count())

In [0]:
display(df_right.select('key.field0').distinct().count())

In [0]:
display(df_right.groupBy('key.field0').count())

# Comparing Left and Right Join States

## Cross-Analyzing State Stores
This code performs an outer join between the left (orders) and right (payments) state stores to provide a comprehensive view of all records waiting to be matched from both sides.


In [0]:
# Joining left and right state stores to find any mismatches
df_joined = df_left.join(df_right, df_left['key.field0'] == df_right['key.field0'], 'outer') \
    .select(df_left['key.field0'].alias('left_key'), df_left['value'].alias('left_value'), 
            df_right['key.field0'].alias('right_key'), df_right['value'].alias('right_value'))

## Validating Join Conditions and Time Windows
This code performs sophisticated analysis on the joined state stores to verify expected matching behavior and identify potential issues in the stream-to-stream join.


In [0]:
from pyspark.sql.functions import col, expr, when, abs

# Count how many keys from left are matched with right
left_matched_count = df_joined.filter(col('left_key').isNotNull() & col('right_key').isNotNull()).count()

# Count how many keys from right are matched with left
right_matched_count = df_joined.filter(col('right_key').isNotNull() & col('left_key').isNotNull()).count()

# Check value.matched and compare order_time with payment_time if needed
# Check if payment_time is less than 5 minutes of order_time
df_matched = df_joined.withColumn(
    'value_matched', 
    when(col('left_value.matched') == True, True)
    .when(
        col('left_value.matched') == False, 
        abs(expr('unix_timestamp(right_value.payment_time) - unix_timestamp(left_value.order_time)')) < 300
    ).otherwise(False)
)

# Display the results
display(df_matched)

# Identifying Failed Join Conditions

## Finding Mismatched Records
This code identifies records that have matching keys but fail to join due to time window constraints, providing critical insights into join condition issues.


In [0]:
# Keys matched with right table but didn't stastify the join condition
display(df_matched.filter("value_matched == False").filter("left_value.matched == False").select('left_key', 'left_value', 'right_key', 'right_value'))

## Detecting Payments Without Matching Orders
This code identifies payment records in the right-side state store that don't have corresponding order records in the left-side state store.


In [0]:
# No keys exists in left table but exists in right table
display(df_matched.filter("left_key is null").select('left_key', 'left_value', 'right_key', 'right_value'))

## Finding Orders Without Corresponding Payments
This code reveals order records that have no matching payment record in the current state stores, which could indicate delayed payments or unpaid orders.


In [0]:
# Keys exists in left table and didn't match with right table either right keys dropped due to watermark or didn't arrive at all
display(df_matched.filter("right_key is null").select('left_key', 'left_value', 'right_key', 'right_value'))