First we will clearn input directory and drop output table and its respective warehouse directory using below commands

In [0]:
dbutils.fs.rm("dbfs:/FileStore/streaming_input/input1", True)

Out[2]: True

In [0]:
#if table exist you can drop that table using below command

spark.sql("drop table orders_result_final")

Out[3]: DataFrame[]

In [0]:
#Also we will delete the directory also
dbutils.fs.rm("dbfs:/user/hive/warehouse/orders_result_final", True)

Out[4]: False

In [0]:
# we will create input directory
dbutils.fs.mkdirs("dbfs:/FileStore/streaming_input/input1")

Out[5]: True

In [0]:
#target table creation command

spark.sql("create table orders_result_final (customer_id long, orders_placed long, products_purchased long, amount_spent float)")

Out[6]: DataFrame[]

In [0]:
schema_json = "order_id long, customer_id long, customer_fname string, customer_lname string,  city string, state string, pincode long, line_iems array<struct<order_item_id:long, order_item_product_id:long, order_item_quantity:long, order_item_subtotal:float, order_item_product_price:float>>"

In [0]:
order_data = spark.readStream \
.format("json") \
.schema(schema_json) \
.option("path", "dbfs:/FileStore/streaming_input/input1") \
.load()


In [0]:
order_data.createOrReplaceTempView("orders")

In [0]:
exploded_orders = spark.sql("select order_id, customer_id, city, state, pincode, explode(line_iems) as lines from orders")

In [0]:
exploded_orders.createOrReplaceTempView("exploded_orders")

In [0]:
flattened_orders = spark.sql("""select order_id, customer_id, city, state, pincode, 
                            lines.order_item_id as item_id,
                            lines.order_item_product_id as product_id, 
                            lines.order_item_quantity as quantity,
                            lines.order_item_product_price as product_price, 
                            lines.order_item_subtotal as subtotal
                            from exploded_orders""")

In [0]:
# from pyspark.sql.functions import distinct

In [0]:
def myfunction(flattened_orders, batch_id):

    flattened_orders.createOrReplaceTempView("flattened_orders")

    aggregated_orders = flattened_orders._jdf.sparkSession().sql("""select customer_id, approx_count_distinct(order_id) as orders_placed, count(product_id) as products_purchased, sum(subtotal) 
                              as amount_spent
                              from flattened_orders
                              group By customer_id
                              """)
    

    aggregated_orders.createOrReplaceTempView("orders_result")

    merge_statement = """merge into orders_result_final t using orders_result s 
    on t.customer_id = s.customer_id
    when matched 
    then update set t.orders_placed = t.orders_placed + s.orders_placed, t.products_purchased = t.products_purchased + s.products_purchased, t.amount_spent = t.amount_spent + s.amount_spent
    when not matched 
    then insert * 
    """
    flattened_orders._jdf.sparkSession().sql(merge_statement)


    

In [0]:
streaming_query = flattened_orders \
    .writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "Checkpointlocation111") \
    .foreachBatch(myfunction) \
    .start()

In [0]:
spark.sql("select * from orders_result_final").show()

+-----------+-------------+------------------+------------+
|customer_id|orders_placed|products_purchased|amount_spent|
+-----------+-------------+------------------+------------+
|      11599|            3|                 6|     1409.93|
|       6272|            3|                 8|   1709.8401|
|        256|            2|                 6|     1159.96|
|       8827|            2|                 8|   1399.7001|
|      11318|            2|                10|     2259.72|
|      10280|            3|                10|     2659.76|
+-----------+-------------+------------------+------------+



In [0]:
streaming_query.explain()

