
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img
    src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png"
    alt="Databricks Learning"
  >
</div>


# Windowed Aggregation with Watermark

## Objectives
1. Build streaming DataFrames with time window aggregates with watermark
1. Write streaming query results to Delta table using `update` mode and `forEachBatch()`
1. Monitor the streaming query


### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html" target="_blank">StreamingQuery</a>
- <a href="https://spark.apache.org/docs/3.5.7/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch" target="_blank">foreachbatch</a>
- <a href="https://docs.databricks.com/en/structured-streaming/delta-lake.html#upsert-from-streaming-queries-using-foreachbatch&language-python" target="_blank">update mode with foreachbatch</a>

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

1. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

    - In the drop-down, select **More**.

    - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

1. Find the triangle icon to the right of your compute cluster name and click it.

1. Wait a few minutes for the cluster to start.

1. Once the cluster is running, complete the steps above to select your cluster.

In [0]:
%run ./Includes/Classroom-Setup-04


## Build Streaming DataFrames

Obtain an initial streaming DataFrame from a Delta-format file source.

In [0]:
from pyspark.sql.functions import window, sum, col
from pyspark.sql.types import TimestampType

parsed_df = (spark.readStream
                    .load('/Volumes/dbacademy_ecommerce/v01/delta/events_hist')
                    .withColumn("event_timestamp", (col("event_timestamp") / 1e6).cast("timestamp"))
                    .withColumn("event_previous_timestamp", (col("event_previous_timestamp") / 1e6).cast("timestamp"))

                    # filter out zero revenue events
                    .filter("ecommerce.purchase_revenue_in_usd IS NOT NULL AND ecommerce.purchase_revenue_in_usd != 0")
)

In [0]:
display(parsed_df)

In [0]:
windowed_df = (parsed_df
                    # now add up revenues by city by 60 minute time window
                    .withWatermark(eventTime="event_timestamp", delayThreshold="90 minutes")
                    # group by city by hour
                    .groupBy(window(timeColumn="event_timestamp", windowDuration="60 minutes"), "geo.city")
                    .agg(sum("ecommerce.purchase_revenue_in_usd").alias("total_revenue"))

)

In [0]:
display(windowed_df)

## Write streaming results

Let's explore a couple options for writing the results.

### Write streaming results in `append` mode (option 1)

In the below example, the sink table is appended with new rows from results table on each trigger.
Think about how append mode writing data to sink. What will happen to records of those city whose hourly revenue got updated due to late arrival data. Would it be updated into the sink with "append" mode?

In [0]:
checkpoint_path = f"{DA.paths.working_dir}/query_revenue_by_city_by_hour_append"
# Write the output of a streaming aggregation query into Delta table as updates.The implication of append output modes in the context of window aggregation and watermarks is that an aggregate can be produced only once and can not be updated. Therefore, once the aggregate is produced, the engine can delete the aggregate's state and thus keep the overall aggregation state bounded.
windowed_query = (windowed_df.writeStream
                  .queryName("query_revenue_by_city_by_hour_append")
                  .option("checkpointLocation", checkpoint_path)
                  .trigger(availableNow=True)
                  .outputMode("append")
                  .table("revenue_by_city_by_hour_append")
                )

In [0]:
%sql
SELECT * FROM revenue_by_city_by_hour_append

In [0]:
%sql
DESCRIBE HISTORY revenue_by_city_by_hour_append


### Write streaming query results in `update` mode (option 2)

Take the final streaming DataFrame (our result table) and write it to a Delta Table sink in `update` mode. This approach gives much greater control to the developer when it comes to updating the sink, albeit with greater complexity.

**NOTE:** The syntax for Writing streaming results to a Delta table or dataset in `update` mode is a little different. It requires use of the `MERGE` command within a `forEachBatch()` function call. This also requires the target table to be pre-created.


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, ArrayType, TimestampType

# Here we are creating the table with the same schema as the incoming dataframe
schema = StructType([StructField('window',StructType([StructField('start', TimestampType(), True), 
                                                      StructField('end', TimestampType(), True)]), False), 
                     StructField('city', StringType(), True), 
                     StructField('total_revenue', DoubleType(), True)])

empty_df = spark.createDataFrame([], schema=schema)

empty_df.write.saveAsTable(name="revenue_by_city_by_hour", mode='overwrite')

In [0]:
# Function to upsert microBatchOutputDF into Delta table using merge

def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")
  # IMP: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO revenue_by_city_by_hour t
    USING updates s
    ON t.window.start = s.window.start AND t.window.end = s.window.end AND t.city = s.city
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

Check the `revenue_by_city_by_hour` table before writing it to a Delta table sink in `UPDATE` mode.

In [0]:
%sql
SELECT * FROM revenue_by_city_by_hour

### Execute below code to write result table into delta table using **update** mode
Dive deep into the query raw metrics and pay close attention to stateful operator section, see if you could identify **watermark** work in action and number of rows removed due to it's settings

In [0]:
checkpoint_path = f"{DA.paths.working_dir}/query_revenue_by_city_by_hour"

# Write the output of a streaming aggregation query into Delta table as updates
windowed_query = (windowed_df.writeStream
                  .foreachBatch(upsertToDelta)
                  .outputMode("update")
                  .queryName("query_revenue_by_city_by_hour")
                  .option("checkpointLocation", checkpoint_path)
                  .trigger(availableNow=True)
                  .start()
                )

In [0]:
%sql
SELECT * FROM revenue_by_city_by_hour

In [0]:
%sql
DESCRIBE HISTORY revenue_by_city_by_hour

In [0]:
for s in spark.streams.active:
  print(s.name)
  s.stop()

&copy; 2026 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="_blank">Apache Software Foundation</a>.<br/><br/><a href="https://databricks.com/privacy-policy" target="_blank">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use" target="_blank">Terms of Use</a> | <a href="https://help.databricks.com/" target="_blank">Support</a>