In [0]:
from pyspark.sql.functions import col

csv_schema = "TransactionID STRING, TransactionDate STRING, ProductID STRING, Quantity INT, Price INT" 

product_stream = spark.readStream.format("csv").option("header","true").schema(csv_schema).load("file:/Workspace/Shared/streaming_csv")

# Task 1
query = product_stream.writeStream.outputMode("append").format("console").start()

# Task 2 Stream Transformations
# Total Amount
product_stream_transformed = product_stream.withColumn("TotalAmount", col("Quantity") * col("Price"))

# Filter
product_stream_filtered = product_stream_transformed.filter(col("Quantity")>1)

query2 = product_stream_filtered.writeStream.outputMode("append").format("memory").queryName("filtered_product").start()


In [0]:
query.stop()
query2.stop()

print("Filtered Products")
spark.sql("SELECT * FROM filtered_product").show()


Filtered Products
+-------------+---------------+---------+--------+-----+-----------+
|TransactionID|TransactionDate|ProductID|Quantity|Price|TotalAmount|
+-------------+---------------+---------+--------+-----+-----------+
|         T101|     2024-01-01|   Laptop|       2| 1200|       2400|
|         T103|     2024-01-03|   Tablet|       3|  600|       1800|
|         T105|     2024-01-05|    Mouse|       5|   25|        125|
+-------------+---------------+---------+--------+-----+-----------+



In [0]:
from pyspark.sql.functions import sum
# Task 3 Aggregated

aggregated_df = product_stream_transformed.groupBy("ProductID").agg(sum("TotalAmount").alias("TotalSales"))

query3 = aggregated_df.writeStream.outputMode("update").format("memory").queryName("aggregated_sales").start()

In [0]:
query3.stop()

print("Aggregated Sales")
spark.sql("SELECT * FROM aggregated_sales").show()

Aggregated Sales
+---------+----------+
|ProductID|TotalSales|
+---------+----------+
|    Phone|       800|
|   Laptop|      2400|
|    Mouse|       125|
|   Tablet|      1800|
|  Monitor|       300|
+---------+----------+



In [0]:
# Task 4 Stream results to parquet 
parquet_output_path = "/Workspace/Shared/parquet_output"
checkpoint = "/Workspace/Shared/checkpoint"

query4 = product_stream_transformed.writeStream.format("parquet").outputMode("append").option("checkpointLocation", checkpoint).option("path", parquet_output_path).start()

In [0]:
query4.stop()

In [0]:
# Task 5 Watermarks
watermarked_df = product_stream_transformed.withWatermark("TransactionDate", "1 day")

In [0]:
# Task 6

product_schema = "ProductID STRING, ProductName STRING, Category STRING"

productinfo_stream = spark.readStream.format("csv").option("header","true").schema(product_schema).load("file:/Workspace/Shared/join_stream/")

joined_stream = product_stream.join(productinfo_stream, on="ProductID", how="inner")

join_query = joined_stream.writeStream.format("memory").outputMode("append").queryName("joined_stream").start()

In [0]:
join_query.stop()