In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from delta import *
from delta.tables import *
from dotenv import load_dotenv
import os
import uuid
import time

builder = SparkSession.builder.appName("Streaming2_practice") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

load_dotenv()

KAFKA_USER = os.getenv('KAFKA_USER')
KAFKA_PW = os.environ['KAFKA_TOPIC'] = 'your_kafka_topic' 
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC')
KAFKA_GROUP_PREFIX = os.getenv('KAFKA_GROUP_PREFIX')
KAFKA_JAAS_CONFIG = f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{KAFKA_USER}' password='{KAFKA_PW}';"

# Generate a random UUID
KAFKA_GROUP_ID = f"{KAFKA_GROUP_PREFIX}{uuid.uuid4()}"

# spark.conf.set("spark.sql.repl.eagerEval.enabled",True) # OK for exploration, not great for performance
# spark.conf.set("spark.sql.repl.eagerEval.truncate", 500)

#### Spark configuration note

In [None]:
# Let's start with some performance configurations 

#spark.conf.get("spark.sql.shuffle.partitions") # this is the number of partitions when shuffling data. By default it is 200, which may be overkill for smaller clusters

#spark._sc.defaultParallelism # this is the default level of parallelism based on the system settings. E.g., amount of cores, cluster size, etc

# For most, and especially smaller datasets, it makes sense to reduce the shuffle partitions. 
# However, if you get OOM for huge datasets, you may need to increase the partitions. There is no golden rule, you need to test and benchmark
#spark.conf.set("spark.sql.shuffle.partitions", spark._sc.defaultParallelism)

### Structured Streaming
  
  
* Kafka 
* Aggregations
* Time windows
* Watermarking
* Joins

In [8]:
# for getting data from Kafka (or other distributed log systems), we need minimum 2 things:
# the server
# the topic

kafka_server = "localhost:9093"  # Correct Kafka bootstrap server

orders_df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_server)
  .option("kafka.security.protocol", "SASL_SSL")  # Or SASL_PLAINTEXT if you’re using SCRAM-SHA-512
  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
  .option("kafka.sasl.jaas.config", KAFKA_JAAS_CONFIG)
  .option("kafka.group.id", KAFKA_GROUP_ID)
  .option("subscribe", KAFKA_TOPIC)  # Ensure this matches your producer's topic
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 100)
  .load()
)

orders_df.printSchema()  # To see the schema of the data being received


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
checkpoint_path = "streaming/orders/_checkpoint" 
table_name = "orders_s"
output_path = f"spark-warehouse/{table_name}"

orders_delta_query = (orders_df.writeStream
  .outputMode("append")
  .format("delta")
  .queryName("orders_delta_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

# if you create the table metastore before any data exists then the stream will result in an error as the table is generated with empty schema
def create_table_if_exists(output_path,table_name):
    data_exists = False
    for _i in range(60): # you can replace this with while, currently timeouts after about 60 seconds
        try:
            time.sleep(1)
            files = os.listdir(output_path)
            for _f in files:
                if ".parquet" in _f:
                    if len(os.listdir(f"{output_path}/_delta_log"))>0:
                        print("data exists")
                        data_exists = True
                        break
            if data_exists:
                spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{table_name}'") # table metastore is created once there is some data (.parquet) in the directory
                break
        except Exception as e:
            print(e) # if you want to see the exceptions, uncomment this
            pass

create_table_if_exists(output_path,table_name)

In [10]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from delta import *
from delta.tables import *
from dotenv import load_dotenv
import os
import uuid
import time

# Create SparkSession with Delta Lake support
builder = SparkSession.builder.appName("KafkaToDeltaStream") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Load environment variables for Kafka connection
load_dotenv()

KAFKA_USER = os.getenv('KAFKA_USER')   # Kafka user credentials
KAFKA_PW  = os.environ['KAFKA_TOPIC'] = 'your_kafka_topic' 
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC')  # Kafka topic
KAFKA_GROUP_PREFIX = os.getenv('KAFKA_GROUP_PREFIX')  # Kafka group prefix
KAFKA_JAAS_CONFIG = f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{KAFKA_USER}' password='{KAFKA_PW}';"

# Generate a random UUID for group ID
KAFKA_GROUP_ID = f"{KAFKA_GROUP_PREFIX}{uuid.uuid4()}"

# Kafka server configuration
kafka_server = "localhost:9093"  # Kafka server address

# Reading data from Kafka stream
orders_df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_server)
  .option("kafka.security.protocol", "SASL_SSL")  # Adjust according to your security protocol
  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")  # SASL mechanism for authentication
  .option("kafka.sasl.jaas.config", KAFKA_JAAS_CONFIG)
  .option("kafka.group.id", KAFKA_GROUP_ID)
  .option("subscribe", KAFKA_TOPIC)  # Subscribe to the Kafka topic
  .option("startingOffsets", "earliest")  # Start from the earliest offset
  .option("maxOffsetsPerTrigger", 100)  # Rate limit for offsets per trigger
  .load()  # Read the stream
)

# The data in Kafka is in binary format, we need to decode the 'value' column
orders_df = orders_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")  # Decode 'key' and 'value' to strings

# Optionally: you can parse the 'value' as JSON if the data is in JSON format
orders_df = orders_df.withColumn("value", F.from_json(orders_df["value"], "order_id INT, order_value INT"))



# Output path and checkpoint location
checkpoint_path = "streaming/orders/_checkpoint"
output_path = "delta/orders_output"

# Ensure output path exists (optional, you can handle it dynamically)
os.makedirs(output_path, exist_ok=True)

# Write the stream to Delta format
orders_delta_query = (orders_df.writeStream
  .outputMode("append")  # Append mode since we're continuously adding data
  .format("delta")
  .queryName("orders_delta_query")
  .trigger(processingTime="5 second")  # Trigger interval, adjust as necessary
  .option("checkpointLocation", checkpoint_path)  # Checkpoint location to allow recovery
  .start(output_path)  # Delta table output path
)


IllegalArgumentException: Cannot start query with name orders_delta_query as a query with that name is already active in this SparkSession

In [2]:
# let's have a look at the data 


# Optional: Function to create table if not exists
def create_table_if_exists(output_path, table_name):
    data_exists = False
    for _i in range(60):  # Check for data presence
        try:
            time.sleep(1)
            files = os.listdir(output_path)
            for _f in files:
                if ".parquet" in _f:
                    if len(os.listdir(f"{output_path}/_delta_log")) > 0:
                        print("Data exists in Delta path.")
                        data_exists = True
                        break
            if data_exists:
                # Create the table only when data exists
                spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{output_path}'")
                print(f"Table {table_name} created.")
                break
        except Exception as e:
            print(e)

# Create the table once data is available
create_table_if_exists(output_path, "orders_s")

# Display the first 5 rows of the table
display(spark.table("orders_s").limit(5))



display(spark.table(table_name).limit(5))

#key - the data key. Used in state machines, not useful in this case
#value - the data, in binary format. This is our JSON payload. We'll need to cast it to STRING.
#topic - the topic we are subscribing to
#partition - partition.
#offset - the offset value. This is per topic, partition, and consumer group
#timestamp - the timestamp (commonly, the processing (or ingestion) timestamp)
#timestampType - whether timestamp is create time or log append time 

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `orders_s` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.;
'UnresolvedRelation [orders_s], [], false


In [11]:
# let's have a look at the JSON payload

from pyspark.sql.types import StringType

orders_payload_df = (spark.table(table_name)
                  .select(F.col("value").cast(StringType()))
                  )

display(orders_payload_df)

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `orders_s` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.;
'UnresolvedRelation [orders_s], [], false


In [12]:
# let's create a schema for navigating the JSON payload
from pyspark.sql.types import StructType, StructField, ArrayType, TimestampType

schema = StructType([
  StructField("order_id", StringType(), True),
  StructField("user_id", StringType(), True),
  StructField("product_ids", ArrayType(StringType(), True), True),
  StructField("order_timestamp", TimestampType(), True)
])

In [13]:
# Now we can use "from_json" to parse out the message and provide schema

orders_json_df = (orders_payload_df
                .select(F.from_json("value", schema).alias("json"))
                .select(F.col("json.*"))
               )

display(orders_json_df)

NameError: name 'orders_payload_df' is not defined

In [None]:
orders_json_df.printSchema()

In [None]:
# let's cast the ids into integer and create a new df that has the clean payload data 
from pyspark.sql.types import IntegerType

orders_cleaned_df = (orders_df
  .select(F.col("value").cast(StringType()))
  .select(F.from_json("value", schema).alias("json"))
  .select(F.col("json.*"))
  .withColumn("order_id", F.col("order_id").cast(IntegerType()))
  .withColumn("user_id", F.col("user_id").cast(IntegerType()))
  .withColumn("product_ids", F.transform(F.col("product_ids"), lambda x: x.cast(IntegerType())))
)

orders_cleaned_df.printSchema()

In [None]:
# let's cast the ids into integer and create a new query that outputs the users by most purchased products 
checkpoint_path = "streaming/orders_most_products/_checkpoint" 
table_name = "orders_most_products"
output_path = f"spark-warehouse/{table_name}"

orders_most_products_query = (orders_cleaned_df
  .select("user_id",F.size("product_ids").alias("count_of_products"))
  .groupBy("user_id")
  .sum("count_of_products")
  .withColumnRenamed("sum(count_of_products)", "product_count")
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("orders_most_products_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from delta import *
import os

# Create SparkSession with Delta Lake support
builder = SparkSession.builder.appName("CSVStreamToDelta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Define the schema for the CSV file to match the data structure
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("ip_address", StringType(), True)
])

# Path to the directory where CSV files are located (for streaming)
csv_path = "input/sensor-data/"  # Path to the folder containing CSV files

# Reading the data as a stream from CSV files in the directory
df = (spark.readStream
      .schema(schema)  # Define the schema explicitly
      .csv(csv_path)   # Read CSV files as a stream from the directory
)

# Output path and checkpoint location
checkpoint_path = "streaming/csv_checkpoint"
output_path = "delta/csv_output"

# Ensure output path exists (optional, you can handle it dynamically)
os.makedirs(output_path, exist_ok=True)

# Write the stream to Delta format
csv_delta_query = (df.writeStream
                   .outputMode("append")  # Append mode since we're continuously adding data
                   .format("delta")
                   .queryName("csv_to_delta_query")
                   .trigger(processingTime="5 seconds")  # Trigger interval, adjust as necessary
                   .option("checkpointLocation", checkpoint_path)  # Checkpoint location for fault tolerance
                   .start(output_path)  # Delta table output path
)

# Wait for the query to terminate
csv_delta_query.awaitTermination()


StreamingQueryException: [STREAM_FAILED] Query [id = 77c385a9-470d-4cd4-b424-aca160083b69, runId = 9d640a41-25b8-43b7-a30c-030fcff267e1] terminated with exception: Wrong basePath input/sensor-data for the root path: file:/home/jovyan/input/mock_data.csv

### Time windows

In [None]:
orders_cleaned_df.printSchema()

In [None]:
# commonly, you might not want an aggregation of a stream's whole history.
# for this purpose, let's use window from functions - NB this is "time windows" not "SQL-like (row) window function"

checkpoint_path = "streaming/orders_most_products_tumb/_checkpoint" 
table_name = "orders_most_products_tumb"
output_path = f"spark-warehouse/{table_name}"

orders_most_products_tumb_query = (orders_cleaned_df
  .select("user_id",F.size("product_ids").alias("count_of_products"), "order_timestamp")
  .groupBy("user_id", F.window("order_timestamp", "5 minute")) # Aggregate by user, every 5 minute block. This is a "tumbling window"
  .sum("count_of_products")
  .withColumnRenamed("sum(count_of_products)", "product_count")
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("orders_most_products_tumb_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).orderBy(F.desc("window.end"),F.desc("product_count"))

In [None]:
# if we want to keep always the latest time window then we can use sliding windows

checkpoint_path = "streaming/orders_most_products_slide/_checkpoint" 
table_name = "orders_most_products_slide"
output_path = f"spark-warehouse/{table_name}"

orders_most_products_slide_query = (orders_cleaned_df
  .select("user_id",F.size("product_ids").alias("count_of_products"), "order_timestamp")
  .groupBy("user_id", F.window("order_timestamp", "5 minute", "1 minute")) # Aggregate by user, every 5 minute block sliding by 1 minute.
  .sum("count_of_products")
  .withColumnRenamed("sum(count_of_products)", "product_count")
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("orders_most_products_slide_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).orderBy(F.desc("window.end"),F.desc("product_count"))

##### Non-Kafka part starts here

In [None]:
# let's try with a different, simpler dataset

input_path = "sensor-data"

json_schema = "time timestamp, action string"

In [None]:
# Let's create a dataframe and apply some transformations and aggregation

input_df = (spark
  .readStream                                 
  .schema(json_schema)                       
  .option("maxFilesPerTrigger", 1)            
  .json(input_path)                           
)

counts_df = (input_df
  .groupBy(F.col("action"),                     # Aggregate by action
           F.window(F.col("time"), "1 hour"))     # and by a 1 hour window
  .count()                                    # Count the actions
  .select(F.col("window.start").alias("start"), 
          F.col("count"),                       
          F.col("action"))                      
)

In [None]:
checkpoint_path = "streaming/counts/_checkpoint" 
table_name = "counts"
output_path = f"spark-warehouse/{table_name}"

counts_query = (counts_df
  .writeStream
  .outputMode("complete")
  .format("delta")
  .queryName("counts_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).orderBy(F.col("start"))

### Watermarking

In [None]:
# in actual use cases, the queries above would keep running for a very long time and the amount of windows would grow indefinitely
# keeping track of all the states puts pressure on memory
# also it may often be irrelevant if delayed data updates our figures

watermarked_stream = "watermarked_stream"

watermarked_df = (input_df
  .withWatermark("time", "2 hours")             # Specify a 2-hour watermark
  .groupBy(F.col("action"),                       # Aggregate by action...
           F.window(F.col("time"), "1 hour"))       # ...then by a 1 hour window
  .count()                                      # For each aggregate, produce a count
  .select(F.col("window.start").alias("start"),   # Elevate field to column
          F.col("count"),                         # Include count
          F.col("action"))                        # Include action
)
display(watermarked_df, streamName = watermarked_stream) # Start the stream and display it

# important note: watermarking guarantees that any event within the window gets in. It does not guarantee leaving anything out.

In [None]:
# in actual use cases, the queries above would keep running for a very long time and the amount of windows would grow indefinitely
# keeping track of all the states puts pressure on memory
# also it may often be irrelevant if delayed data updates our figures

checkpoint_path = "streaming/orders_most_products_slide_wm/_checkpoint" 
table_name = "orders_most_products_slide_wm"
output_path = f"spark-warehouse/{table_name}"

orders_most_products_slide_wm_query = (orders_cleaned_df
  .withWatermark("order_timestamp", "20 minute")             # Specify a 20-minute watermark
  .select("user_id",F.size("product_ids").alias("count_of_products"), "order_timestamp")
  .groupBy("user_id", F.window("order_timestamp", "5 minute", "1 minute")) # Aggregate by user, every 5 minute block sliding by 1 minute.
  .sum("count_of_products")
  .withColumnRenamed("sum(count_of_products)", "product_count")
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("orders_most_products_slide_wm_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

# important note: watermarking guarantees that any event within the window gets in. It does not guarantee leaving anything out.

In [None]:
spark.table(table_name).orderBy(F.desc("window.end"),F.desc("product_count"))

In [None]:
# Let's import another dataset. Let's say we are interested in hourly monitoring of incoming traffic to our website

schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

hourlyEventsPath = "events20200703"

website_df = (spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .json(hourlyEventsPath)
)

In [None]:
# this dataframe does not have a proper timestamp column. So we need to create one and use it for watermarking

events_df = (website_df
             .withColumn("createdAt", (F.col("event_timestamp") / 1e6).cast("timestamp"))
             .withWatermark("createdAt", "2 hours")
)             

In [None]:
# now we can do an aggregation

traffic_df = (events_df
             .groupBy("traffic_source"
                      , F.window(F.col("createdAt"), "1 hour"))
             .agg(F.approx_count_distinct("user_id").alias("active_users"))
             .select(F.col("traffic_source")
                     , F.col("active_users")
                     , F.hour(F.col("window.start")).alias("hour"))
             .sort("hour")
)


checkpoint_path = "streaming/traffic/_checkpoint" 
table_name = "traffic"
output_path = f"spark-warehouse/{table_name}"

traffic_query = (traffic_df
  .writeStream
  .outputMode("complete") # we overwrite the complete table with every trigger
  .format("delta")
  .queryName("traffic_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name)

### Joining streams

In [None]:
# let's load in users dataset

users_df = spark.read.parquet("users.parquet")

In [None]:
# join works same way as with regular dataframes.
# note: this is streaming<->static join

joined_df = (events_df
            .join(users_df.drop("user_first_touch_timestamp"), "user_id")
            )

checkpoint_path = "streaming/join_static/_checkpoint" 
table_name = "join_static"
output_path = f"spark-warehouse/{table_name}"

join_static_query = (joined_df
  .writeStream
  .outputMode("append")
  .format("delta")
  .queryName("join_static_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).limit(5)

In [None]:
# let's read in users dataframe as a stream

# since we have created the dataframe from this data, we can cheat on getting the schema. Possible in development/debugging, not possible or recommended in production
users_schema = users_df.schema

users_stream_df = (spark
                   .readStream
                   .format("parquet")
                   .schema(users_schema)
                   .option("maxFilesPerTrigger", 1)
                   .parquet("users.parquet")
                  )

In [None]:
# let's do a stream to stream join

joined_streams_df = (events_df
            .join(users_stream_df.drop("user_first_touch_timestamp"), "user_id")
            )

checkpoint_path = "streaming/join_stream/_checkpoint" 
table_name = "join_stream"
output_path = f"spark-warehouse/{table_name}"

join_stream_query = (joined_df
  .writeStream
  .outputMode("append")
  .format("delta")
  .queryName("join_stream_query")
  .trigger(processingTime="10 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

create_table_if_exists(output_path,table_name)

In [None]:
spark.table(table_name).limit(5)

In [None]:
for stream in spark.streams.active:
  stream.stop()

### Further reading

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html  
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withWatermark.html  
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html  
https://docs.databricks.com/spark/latest/structured-streaming/index.html

### Task 1

Create a streaming dataframe from the data in the following path:  
`flights200701stream`

The schema should contain
* DepartureAt (timestamp)
* UniqueCarrier (string)

Process only 1 file per trigger.  

Aggregate the data by count, using non-overlapping 30 minute windows.  
Ignore any data that is older than 6 hours.

The output should have 3 columns: startTime (window start time), UniqueCarrier, count.  

Save to a delta table, firing the trigger every 5 seconds.

Display the table, the output should be sorted ascending by startTime.

Once the stream has produced some output, call the stream shutdown function.

In [None]:
# Your answer


### Task 2

Join the Kafka streaming orders dataframe to the `product.csv` dataset.  
Note that Spark assumes that any streaming dataframes refer to a directory, not a specific file.

Aggregate the data by sum(price) (`total_price`) and a 2-minute tumbling window.  
Add a 10 minute watermark, and store the data in a delta table.

Create a view on top of the delta table with the following columns:
* product_id
* product_name
* n_minus_2_window_total_price
* n_minus_1_window_total_price
* current_window_total_price

The total_price columns need to be pivoted based on only the 3 most recent windows. 
The actual 
Order the dataset by descending `current_total_price`.

In [None]:
# Your answer
