In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df=spark.read.format("json")\
    .option("inferschema","true")\
    .option("multiline","true")\
    .load("/Volumes/workspace/stream/streaming/jsonsource")

display(df)



customer,items,metadata,order_id,payment,timestamp
"List(List(Toronto, Canada, M5H 2N2), 501, john@example.com, John Doe)","List(List(I100, 25.99, Wireless Mouse, 2), List(I101, 15.49, USB-C Adapter, 1))","List(List(campaign, back_to_school), List(channel, email))",ORD1001,"List(Credit Card, TXN7890)",2025-06-01T10:15:00Z
"List(List(Vancouver, Canada, V5K 0A1), 502, alice@example.com, Alice Smith)","List(List(I102, 45.0, Bluetooth Keyboard, 1))","List(List(campaign, cyber_monday), List(channel, affiliate))",ORD1002,"List(PayPal, TXN7891)",2025-06-01T10:30:00Z


In [0]:

df=df.withColumn("metadata",explode_outer("metadata"))
df.select("metadata.key","metadata.value").display()


key,value
campaign,back_to_school
channel,email
campaign,cyber_monday
channel,affiliate


In [0]:
df.select("timestamp").display()


timestamp
2025-06-01T10:15:00Z


In [0]:
df.printSchema()

root
 |-- customer: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |-- customer_id: long (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- product_name: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- payment: struct (nullable = true)
 |    |-- method: string (nullable = true)
 |    |-- transaction_id: string (nullable = true)


In [0]:
df.select("customer.address").display()

address
"List(Toronto, Canada, M5H 2N2)"


In [0]:
df.select("payment.method","payment.transaction_id").display()

method,transaction_id
Credit Card,TXN7890


In [0]:
df.select("customer.name").display()

name
John Doe


In [0]:
df=df.select("order_id","customer.address.city","customer.email","customer.customer_id","customer.name","items","metadata","payment.method","payment.transaction_id","timestamp").display()

order_id,city,email,customer_id,name,items,metadata,method,transaction_id,timestamp
ORD1001,Toronto,john@example.com,501,John Doe,"List(List(I100, 25.99, Wireless Mouse, 2), List(I101, 15.49, USB-C Adapter, 1))","List(List(campaign, back_to_school), List(channel, email))",Credit Card,TXN7890,2025-06-01T10:15:00Z


In [0]:
df= df.withColumn("items",explode_outer("items"))



In [0]:
display(df)

customer,items,metadata,order_id,payment,timestamp
"List(List(Toronto, Canada, M5H 2N2), 501, john@example.com, John Doe)","List(I100, 25.99, Wireless Mouse, 2)","List(List(campaign, back_to_school), List(channel, email))",ORD1001,"List(Credit Card, TXN7890)",2025-06-01T10:15:00Z
"List(List(Toronto, Canada, M5H 2N2), 501, john@example.com, John Doe)","List(I101, 15.49, USB-C Adapter, 1)","List(List(campaign, back_to_school), List(channel, email))",ORD1001,"List(Credit Card, TXN7890)",2025-06-01T10:15:00Z


In [0]:
df.select("items.product_name").display()

product_name
Wireless Mouse
USB-C Adapter


**READING STREAMING DATA**

In [0]:
my_schema = """
order_id STRING,
timestamp STRING,
customer STRUCT<
    customer_id: BIGINT,
    email: STRING,
    name: STRING,
    address: STRUCT<
        city: STRING,
        country: STRING,
        postal_code: STRING
    >
>,
items ARRAY<STRUCT<
    item_id: STRING,
    price: DOUBLE,
    product_name: STRING,
    quantity: BIGINT
>>,
payment STRUCT<
    method: STRING,
    transaction_id: STRING
>,
metadata ARRAY<STRUCT<
    key: STRING,
    value: STRING
>>
"""

In [0]:
df=spark.readStream.format("json")\
    .option("multiline","true")\
    .schema(my_schema)\
    .load("/Volumes/workspace/stream/streaming/jsonsource")

#transforming
from pyspark.sql.functions import explode_outer

df = spark.readStream.format("json") \
    .option("multiline", "true") \
    .schema(my_schema) \
    .load("/Volumes/workspace/stream/streaming/jsonsource")

# Step 1: Select only required nested fields (keep 'items' struct intact)
df = df.select(
    "items",
    "order_id",
    "timestamp",
    "customer.customer_id",
    "customer.name",
    "customer.email",
    "customer.address.city",
    "customer.address.country",
    "customer.address.postal_code",
    "payment.method",
    "payment.transaction_id",
    "metadata"
)

# Step 2: Explode metadata
df = df.withColumn("metadata", explode_outer("metadata")) \
       .select("*", "metadata.key", "metadata.value") \
       .drop("metadata")

# Step 3: Explode items (NOW it exists)
df = df.withColumn("items", explode_outer("items")) \
       .select(
           "*",
           "items.item_id",
           "items.price",
           "items.product_name",
           "items.quantity"
       ).drop("items")




In [0]:
df.writeStream.format("delta")\
    .outputMode("append")\
    .trigger(once=True)\
    .option("path","/Volumes/workspace/stream/streaming/jsonsink/data")\
    .option("checkpointLocation","/Volumes/workspace/stream/streaming/jsonsink/checkpoint")\
    .start()

<pyspark.sql.connect.streaming.query.StreamingQuery at 0xff5fc3734e30>

In [0]:
%sql
SELECT * FROM delta.`/Volumes/workspace/stream/streaming/jsonsink/data`


order_id,timestamp,customer_id,name,email,city,country,postal_code,payment,key,value,item_id,price,product_name,quantity
ORD1001,2025-06-01T10:15:00Z,501,John Doe,john@example.com,Toronto,Canada,M5H 2N2,"List(Credit Card, TXN7890)",campaign,back_to_school,I100,25.99,Wireless Mouse,2
ORD1001,2025-06-01T10:15:00Z,501,John Doe,john@example.com,Toronto,Canada,M5H 2N2,"List(Credit Card, TXN7890)",campaign,back_to_school,I101,15.49,USB-C Adapter,1
ORD1001,2025-06-01T10:15:00Z,501,John Doe,john@example.com,Toronto,Canada,M5H 2N2,"List(Credit Card, TXN7890)",channel,email,I100,25.99,Wireless Mouse,2
ORD1001,2025-06-01T10:15:00Z,501,John Doe,john@example.com,Toronto,Canada,M5H 2N2,"List(Credit Card, TXN7890)",channel,email,I101,15.49,USB-C Adapter,1
ORD1002,2025-06-01T10:30:00Z,502,Alice Smith,alice@example.com,Vancouver,Canada,V5K 0A1,"List(PayPal, TXN7891)",campaign,cyber_monday,I102,45.0,Bluetooth Keyboard,1
ORD1002,2025-06-01T10:30:00Z,502,Alice Smith,alice@example.com,Vancouver,Canada,V5K 0A1,"List(PayPal, TXN7891)",channel,affiliate,I102,45.0,Bluetooth Keyboard,1
