# **SPARK SQL STREAMING QUERY**

In [0]:
import shutil
import os

folders = [
    "/Volumes/workspace/sparkstreaming/checkpoint/checkpoint-1/",
    "/Volumes/workspace/sparkstreaming/target/",
    "/Volumes/workspace/sparkstreaming/checkpoint/schema/"
]

for folder in folders:
    shutil.rmtree(folder, ignore_errors=True)
    os.makedirs(folder, exist_ok=True)

In [0]:
df = spark.readStream.format("cloudFiles")\
          .option("cloudFiles.format", "csv")\
          .option("cloudFiles.schemaEvolutionMode","rescue")\
          .option("cloudFiles.schemaLocation", "/Volumes/workspace/sparkstreaming/checkpoint/schema/")\
          .load("/Volumes/workspace/sparkstreaming/raw")

In [0]:
df.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "/Volumes/workspace/sparkstreaming/checkpoint/checkpoint-1/")\
    .trigger(once=True)\
    .option("mergeSchema", True)\
    .start("/Volumes/workspace/sparkstreaming/target")

In [0]:
df = spark.read.format("delta")\
            .load("/Volumes/workspace/sparkstreaming/target")
display(df)

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [0]:
rescued_schema = StructType() \
    .add("discount", StringType(), nullable=True) \
    .add("payment_method", StringType(), nullable=True)

# Parse the stringified JSON column
df = df.withColumn("rescued_struct", from_json(col("_rescued_data"), rescued_schema))
display(df)

In [0]:
# Extract individual fields
df = df.withColumn("rescued_discount", col("rescued_struct.discount")) \
       .withColumn("rescued_payment_method", col("rescued_struct.payment_method"))

In [0]:
display(df)

In [0]:
display(df.select("customer_id","order_id","order_date","order_amount","order_status","rescued_discount","rescued_payment_method"))

In [0]:
df = spark.readStream.format("cloudFiles")\
          .option("cloudFiles.format", "csv")\
          .option("cloudFiles.schemaEvolutionMode","addNewColumns")\
          .option("cloudFiles.schemaLocation", "/Volumes/workspace/sparkstreaming/checkpoint/schema/")\
          .load("/Volumes/workspace/sparkstreaming/raw_1")

In [0]:
df.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "/Volumes/workspace/sparkstreaming/checkpoint/checkpoint-1/")\
    .trigger(once=True)\
    .option("mergeSchema", True)\
    .start("/Volumes/workspace/sparkstreaming/target")

In [0]:
df = spark.read.format("delta")\
            .load("/Volumes/workspace/sparkstreaming/target")
display(df)

In [0]:
df = spark.readStream.format("cloudFiles")\
          .option("cloudFiles.format", "csv")\
          .option("cloudFiles.schemaEvolutionMode","none")\
          .option("cloudFiles.schemaLocation", "/Volumes/workspace/sparkstreaming/checkpoint/schema/")\
          .load("/Volumes/workspace/sparkstreaming/raw_1")

In [0]:
df.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "/Volumes/workspace/sparkstreaming/checkpoint/checkpoint-1/")\
    .trigger(once=True)\
    .option("mergeSchema", True)\
    .start("/Volumes/workspace/sparkstreaming/target")

In [0]:
df = spark.read.format("delta").load("/Volumes/workspace/sparkstreaming/target")
display(df)