#**Spark Structured Streaming: Aggregations**




In [0]:
spark

# Data: Heterogeneity Human Activity Recognition Dataset
* In this notebook, we will work with the [Heterogeneity Human Activity Recognition Dataset](https://archive.ics.uci.edu/dataset/344/heterogeneity+activity+recognition).



In [0]:
%fs ls /databricks-datasets/definitive-guide/data/

## Create a Static DataFrame

In [0]:
sensorDataPath = "/databricks-datasets/definitive-guide/data/activity-data"

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

sensorStatic = spark.read.json(sensorDataPath)

In [0]:
sensorStatic.printSchema()

In [0]:
sensorStatic.show()

## Create a Streaming DataFrame

In [0]:
sensorStreaming = spark\
  .readStream\
  .schema(sensorStatic.schema)\
  .option("maxFilesPerTrigger", 10)\
  .json(sensorDataPath)

# Streaming Aggregations

## Aggregations not based on time
* Aggregations not involving time can be broadly classified into two categories: global aggregations and grouped aggregations.

### Global aggregations
* Aggregations across all the data in the stream.

In [0]:
globalCountQuery = sensorStreaming.groupBy().count()\
 .writeStream\
 .queryName("globalCount")\
 .format("memory")\
 .outputMode("complete")\
 .start()

In [0]:
spark.table("globalCount").show()

### Grouped aggregations
* Aggregations within each group or key present in the data stream.

In [0]:
groupedCountQuery = sensorStreaming.groupBy("gt").count()\
 .writeStream\
 .queryName("gtCount")\
 .format("memory")\
 .outputMode("complete")\
 .start()

In [0]:
spark.table("gtCount").show()

### Multiple aggregations computed together

In [0]:
from pyspark.sql.functions import *

multipleAggQuery = sensorStreaming.groupBy("gt")\
  .agg(count("*"), approx_count_distinct("Device").alias("deviceCount"),approx_count_distinct("Model").alias("modelCount"))\
  .writeStream\
  .queryName("multipleAggs")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.table("multipleAggs").show()

In [0]:
#it is recommneded to stop previous queries before running the following ones
globalCountQuery.stop()
groupedCountQuery.stop()
multipleAggQuery.stop()

## Aggregations with Event-Time Windows
* Spark supports [three types of time windows](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#types-of-time-windows): tumbling, sliding and session.




### Event Time Column
* In this dataset, there are two time-based columns. The *Creation_Time* column defines when an event was created at the source sensor, whereas the *Arrival_Time* defines when an event hits the servers.
* We will use *Creation_Time* as the event time in this example.
* The first step in event-time processing is to convert the event time column into the proper Spark SQL timestamp type.


In [0]:
sensorStreamingEvent = sensorStreaming.selectExpr(
  "*",
  "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

In [0]:
sensorStreamingEvent.printSchema()

### Tumbling Windows
* Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals.
* An input can only be bound to a single window.
* Tumbling windows use the window function: [pyspark.sql.functions.window](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html)

In [0]:
from pyspark.sql.functions import window, col

#ten-minute count
tumblingWindowQuery1 = sensorStreamingEvent.groupBy(window(col("event_time"), windowDuration="10 minutes")).count()\
  .writeStream\
  .queryName("events_per_window1")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.table("events_per_window1").printSchema()

In [0]:
spark.table("events_per_window1").show(truncate=False)

In [0]:
#aggregation on multiple columns including event time
tumblingWindowQuery2 = sensorStreamingEvent.groupBy(window(col("event_time"), "10 minutes"), "User").count()\
  .writeStream\
  .queryName("events_per_window2")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.table("events_per_window2").show(truncate=False)

### Sliding Windows
* Sliding windows can overlap if the duration of slide is smaller than the duration of window, and in this case an input can be bound to multiple windows.
* Sliding windows use the window function with the slideDuration parameter: [pyspark.sql.functions.window](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html)

In [0]:
slidingWindowQuery = sensorStreamingEvent.groupBy(window(col("event_time"), windowDuration="10 minutes", slideDuration="5 minutes"))\
  .count()\
  .writeStream\
  .queryName("events_per_window3")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.table("events_per_window3").show(truncate=False)

In [0]:
#try with sort
spark.table("events_per_window3").sort("window").show(truncate=False)

### Session Windows
* Session windows use the session_window function: [pyspark.sql.functions.session_window](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.session_window.html)



In [0]:
sessionWindowQuery = sensorStreamingEvent.groupBy(session_window(col("event_time"), gapDuration="10 minutes")).count()\
  .writeStream\
  .queryName("events_per_window4")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.table("events_per_window4").show(truncate=False)

### Handling Late Data and Watermarking
* [pyspark.sql.DataFrame.withWatermark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withWatermark.html)  

In [0]:
#suppose you know that your sensor data will not be late by more than 30 minutes.
windowWithWatermarkQuery = sensorStreamingEvent\
  .withWatermark("event_time", "30 minutes")\
  .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
  .count()\
  .writeStream\
  .queryName("events_per_window5")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.table("events_per_window5").show(truncate=False)