Spark Structure Streaming...

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
json_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("timestamp", StringType(), True),  # You can convert to TimestampType if needed
    StructField("customer", StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("address", StructType([
            StructField("city", StringType(), True),
            StructField("postal_code", StringType(), True),
            StructField("country", StringType(), True)
        ]))
    ])),
    StructField("items", ArrayType(StructType([
        StructField("item_id", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", DoubleType(), True)
    ]))),
    StructField("payment", StructType([
        StructField("method", StringType(), True),
        StructField("transaction_id", StringType(), True)
    ])),
    StructField("metadata", ArrayType(StructType([
        StructField("key", StringType(), True),
        StructField("value", StringType(), True)
    ])))
])

In [0]:
#spark.conf.set("spark.sql.streaming.schemaInference", "true")  ---- This is not working in serverless cluster
df_source = spark.readStream.format("json")\
    .schema(json_schema)\
        .option("multiLine", "true")\
            .option("cleanSource", "Archive")\
                .option("sourceArchiveDir", "/Volumes/strom/streamingdata/vol_streamingdata/jsonarchive/")\
                    .load("/Volumes/strom/streamingdata/vol_streamingdata/jsonsource/")

In [0]:
df_sink = df_source.withColumn("order_id", col("order_id"))\
    .withColumn("timestamp", col("timestamp"))\
        .withColumn("customer_id", col("customer.customer_id"))\
            .withColumn("customer_name", col("customer.name"))\
                .withColumn("customer_email", col("customer.email"))\
                    .withColumn("customer_address_city", col("customer.address.city"))\
                        .withColumn("customer_address_postal_code", col("customer.address.postal_code"))\
                            .withColumn("customer_address_country", col("customer.address.country"))\
                                .withColumn("payment_method", col("payment.method"))\
                                    .withColumn("payment_transaction_id", col("payment.transaction_id"))\
                                        .withColumn("Items",explode_outer(col("items")))\
                                            .withColumn("item_id", col("Items.item_id"))\
                                                .withColumn("item_name", col("Items.product_name"))\
                                                    .withColumn("item_quantity", col("Items.quantity"))\
                                                        .withColumn("item_price", col("Items.price"))\
                                                            .withColumn("metadata",explode_outer(col("metadata")))\
                                                                .withColumn("metadata_key", col("metadata.key"))\
                                                                    .withColumn("metadata_value", col("metadata.value"))\
                                                                        .drop("customer", "items", "payment","metadata")

In [0]:
df_sink.display()

In [0]:
df_sink.writeStream.format("delta")\
.outputMode("append")\
.trigger(once=True)\
.option("path","/Volumes/strom/streamingdata/vol_streamingdata/jsonsink/data")\
.option("checkpointLocation", "/Volumes/strom/streamingdata/vol_streamingdata/jsonsink/checkpoint")\
.start()

In [0]:
df_check = spark.read.format("delta")\
    .option("inferSchema", "true")\
    .load("/Volumes/strom/streamingdata/vol_streamingdata/jsonsink/data/")
df_check.display()

In [0]:
#dbutils.fs.mkdirs("/Volumes/strom/streamingdata/vol_streamingdata/jsonarchive")

# **Windowing using structure streaming....**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%sql
create table strom.streamingdata.windowingtbl
(
  color string,
  eventtime timestamp
)

In [0]:
%sql
insert into strom.streamingdata.windowingtbl values ('red', '2022-01-01 00:11:00');

In [0]:
%sql
select * from strom.streamingdata.windowingtbl;

In [0]:
df = spark.readStream.table("strom.streamingdata.windowingtbl")

In [0]:
df = df.groupBy("color",window("eventtime", "10 minutes")).count()

In [0]:
df.display()

In [0]:
df.writeStream.format("delta")\
.outputMode("Complete")\
.trigger(once=True)\
.option("path","/Volumes/strom/streamingdata/vol_streamingdata/windowing1/data")\
.option("checkpointLocation", "/Volumes/strom/streamingdata/vol_streamingdata/windowing1/checkpoint")\
.start()

In [0]:
df_check = spark.read.format("delta")\
    .option("inferSchema", "true")\
    .load("/Volumes/strom/streamingdata/vol_streamingdata/windowing1/data/")
df_check.display()