In [0]:
spark

In [0]:
orders_schema = "order_id long,customer_id long,customer_fname string,customer_lname string,city string,state string,pincode long,line_items array<struct<order_item_id: long,order_item_product_id: long,order_item_quantity: long,order_item_product_price: float,order_item_subtotal: float>>"

In [0]:
orders_schema

'order_id long,customer_id long,customer_fname string,customer_lname string,city string,state string,pincode long,line_items array<struct<order_item_id: long,order_item_product_id: long,order_item_quantity: long,order_item_product_price: float,order_item_subtotal: float>>'

In [0]:
from pyspark.sql.functions import *

In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/Datasets/completemode")

True

In [0]:
order_df=spark.readStream.format("json").schema(orders_schema).option("path","dbfs:/FileStore/Datasets/completemode/").load()

In [0]:
order_df.createOrReplaceTempView("orders")

In [0]:
explode_df=spark.sql("select order_id,customer_id,city,state,pincode,explode(line_items) lines from orders")

In [0]:
explode_df.createOrReplaceTempView("explode_orders")

In [0]:
flatten_df=spark.sql("""select order_id, customer_id, city, state, pincode, 
    lines.order_item_id as item_id, lines.order_item_product_id as product_id,
    lines.order_item_quantity as quantity,lines.order_item_product_price as price,
    lines.order_item_subtotal as subtotal from explode_orders""")

In [0]:
flatten_df.createOrReplaceTempView("flatten_orders")

In [0]:
agg_df=spark.sql("""select customer_id, approx_count_distinct(order_id) as orders_placed, 
count(item_id) as products_purchased,sum(subtotal) as amount_spent 
from flatten_orders group by customer_id""")

In [0]:
spark.sql("create database if not exists retaildb")

DataFrame[]

In [0]:
streaming_query = agg_df \
.writeStream \
.format("delta") \
.outputMode("complete") \
.option("checkpointLocation","checkpointdir5") \
.toTable("retaildb.orders_result104")

In [0]:
spark.sql("select * from retaildb.orders_result104").show()

+-----------+-------------+------------------+------------+
|customer_id|orders_placed|products_purchased|amount_spent|
+-----------+-------------+------------------+------------+
+-----------+-------------+------------------+------------+



In [0]:
streaming_query.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/Datasets/completemode/")

True

In [0]:
query_status = query.status

# Print the status message
print(query_status['message'])
print('Is Data Available:', query_status['isDataAvailable'])
print('Is Trigger Active:', query_status['isTriggerActive'])

Stopped
Is Data Available: False
Is Trigger Active: False
