## Structured Streaming JSON data over Serverless Cluster

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("timestamp", StringType(), True),   # read first as string, then parse to timestamp
    StructField("customer", StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        # address had "3 items" in your sample — adapt names/types as needed
        StructField("address", StructType([
            StructField("city", StringType(), True),
            StructField("country", StringType(), True),
            StructField("postal_code", StringType(), True)
        ]), True)
    ]), True),
    StructField("items", ArrayType(
        StructType([
            StructField("item_id", StringType(), True),
            StructField("product_name", StringType(), True),
            StructField("quantity", IntegerType(), True),
            StructField("price", DoubleType(), True)
        ])
    ), True),
    StructField("payment", StructType([
        StructField("method", StringType(), True),
        StructField("transaction_id", StringType(), True)
    ]), True),
    StructField("metadata", ArrayType(
        StructType([
            StructField("key", StringType(), True),
            StructField("value", StringType(), True)
        ])
    ), True)
])

In [0]:
df= spark.readStream.format("json")\
.schema(order_schema)\
.option("multiline", "true")\
.option("cleanSource", "archive")\
.option("sourceArchiveDir","/Volumes/sparkcatalog/store/raw_volume/json_archive")\
.load("/Volumes/sparkcatalog/store/raw_volume/json_source")

In [0]:
df=df.withColumn("items_exp",explode("items"))

df=df.select("items_exp.item_id","items_exp.price","items_exp.product_name",\
    col("order_id"),\
    col("payment.transaction_id"),\
    col("timestamp"),\
    col("customer.name"),\
    col("customer.email"),\
    col("customer.address.city"),\
    col("customer.address.country"),\
    col("payment.method"))
    # selecting Item_exploded only

In [0]:
df.writeStream.format("delta")\
    .outputMode("append")\
    .trigger(once=True)\
    .option('path','/Volumes/sparkcatalog/store/raw_volume/json_new_sink/data')\
    .option('checkPointLocation','/Volumes/sparkcatalog/store/raw_volume/json_new_sink/checkpoint')\
    .start()

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f5031f22870>

In [0]:
%sql
/* delete from delta.`/Volumes/sparkcatalog/store/raw_volume/json_sink` where order_id='ORD1002';*/
select * from delta.`/Volumes/sparkcatalog/store/raw_volume/json_new_sink/data`

item_id,price,product_name,order_id,transaction_id,timestamp,name,email,city,country,method
I103,199.99,Noise Cancelling Headphones,ORD1003,TXN7892,2025-06-01T11:00:00Z,David Lee,david@example.com,Calgary,Canada,Debit Card
I104,29.99,Laptop Stand,ORD1003,TXN7892,2025-06-01T11:00:00Z,David Lee,david@example.com,Calgary,Canada,Debit Card
I105,10.0,HDMI Cable,ORD1003,TXN7892,2025-06-01T11:00:00Z,David Lee,david@example.com,Calgary,Canada,Debit Card
I100,25.99,Wireless Mouse,ORD1001,TXN7890,2025-06-01T10:15:00Z,John Doe,john@example.com,Toronto,Canada,Credit Card
I101,15.49,USB-C Adapter,ORD1001,TXN7890,2025-06-01T10:15:00Z,John Doe,john@example.com,Toronto,Canada,Credit Card
I102,45.0,Bluetooth Keyboard,ORD1002,TXN7891,2025-06-01T10:30:00Z,Alice Smith,alice@example.com,Vancouver,Canada,PayPal
