d-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 400px">
</div>

# Aggregating Streams
1. Add watermarking
1. Aggregate with windows
1. Display streaming query results
1. Monitor streaming queries

##### Classes
- DataStreamReader (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=datastreamreader#pyspark.sql.streaming.DataStreamReader" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamReader.html" target="_blank">Scala</a>)
- DataStreamWriter (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=datastreamwriter#pyspark.sql.streaming.DataStreamWriter" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html" target="_blank">Scala</a>)
- StreamingQuery (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=streamingquery#pyspark.sql.streaming.StreamingQuery" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/StreamingQuery.html" target="_blank">Scala</a>)
- StreamingQueryManager (<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=streamingquerymanager#pyspark.sql.streaming.StreamingQueryManager" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/StreamingQueryManager.html" target="_blank">Scala</a>)

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Hourly Activity by Traffic Lab
Process streaming data to display the total active users by traffic source with a 1 hour window.
1. Cast to timestamp and add watermark for 2 hours
2. Aggregate active users by traffic source for 1 hour windows
3. Execute query with display() and plot results
5. Use query name to stop streaming query

### Setup
Run the cells below to generate hourly JSON files of event data for July 3, 2020.

In [0]:
%run ./Includes/Classrooºm-Setup

In [0]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

# hourly events logged from the BedBricks website on July 3, 2020
hourlyEventsPath = "/mnt/training/ecommerce/events/events-2020-07-03.json"

df = (spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .json(hourlyEventsPath)
)

### 1. Cast to timestamp and add watermark for 2 hours
- Add column **`createdAt`** by dividing **`event_timestamp`** by 1M and casting to timestamp
- Add watermark for 2 hours

Assign resulting DataFrame to **`eventsDF`**.

In [0]:
from pyspark.sql.functions import col

eventsDF = (df.withColumn("createdAt", (col("event_timestamp") / 1e6).cast("timestamp"))
  .withWatermark("createdAt", "2 hours")
)  

In [0]:
display(eventsDF)

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id,hour,createdAt
iOS,"List(1075.5, 1, 1)",finalize,1593801817162695.0,1593803182518649,"List(McKinney, TX)","List(List(NEWBED10, M_STAN_K, Standard King Mattress, 1075.5, 1195.0, 1))",email,1593439231519133,UA000000106032467,19,2020-07-03T19:06:22.518+0000
Linux,"List(null, null, null)",cart,1593804318237854.0,1593804691399513,"List(Winter Garden, FL)","List(List(NEWBED10, M_PREM_K, Premium King Mattress, 1795.5, 1995.0, 1))",email,1593443919308326,UA000000106055757,19,2020-07-03T19:31:31.399+0000
Linux,"List(null, null, null)",add_item,1593803473586594.0,1593804318237854,"List(Winter Garden, FL)","List(List(NEWBED10, M_PREM_K, Premium King Mattress, 1795.5, 1995.0, 1))",email,1593443919308326,UA000000106055757,19,2020-07-03T19:25:18.237+0000
macOS,"List(850.5, 1, 1)",finalize,1593806248979773.0,1593806352498548,"List(Jacksonville, FL)","List(List(NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1))",email,1593425063951856,UA000000105991459,19,2020-07-03T19:59:12.498+0000
iOS,"List(null, null, null)",cart,1593806229866506.0,1593806233001836,"List(Laurel, MD)","List(List(NEWBED10, M_STAN_K, Standard King Mattress, 1075.5, 1195.0, 1))",email,1593437732970217,UA000000106025899,19,2020-07-03T19:57:13.001+0000
macOS,"List(null, null, null)",cc_info,1593805919578229.0,1593806248979773,"List(Jacksonville, FL)","List(List(NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1))",email,1593425063951856,UA000000105991459,19,2020-07-03T19:57:28.979+0000
Linux,"List(null, null, null)",shipping_info,1593805075989147.0,1593805141364750,"List(Winter Garden, FL)","List(List(NEWBED10, M_PREM_K, Premium King Mattress, 1795.5, 1995.0, 1))",email,1593443919308326,UA000000106055757,19,2020-07-03T19:39:01.364+0000
macOS,"List(null, null, null)",cart,1593804276972372.0,1593804279833526,"List(Jacksonville, FL)","List(List(NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1))",email,1593425063951856,UA000000105991459,19,2020-07-03T19:24:39.833+0000
macOS,"List(null, null, null)",shipping_info,1593802932642269.0,1593803241760836,"List(Ellisville, MO)","List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))",email,1593351371498734,UA000000105772187,19,2020-07-03T19:07:21.760+0000
macOS,"List(null, null, null)",shipping_info,1593802884816480.0,1593803225097531,"List(Portland, OR)","List(List(null, M_PREM_Q, Premium Queen Mattress, 1795.0, 1795.0, 1))",google,1593801016487011,UA000000107143337,19,2020-07-03T19:07:05.097+0000


-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [0]:
assert "StructField(createdAt,TimestampType,true" in str(eventsDF.schema)

### 2. Aggregate active users by traffic source for 1 hour windows
- Set default shuffle partitions to number of cores on your cluster (not required, but runs faster)
- Group by **`traffic_source`** with a 1 hour window based on the **`createdAt`** column
- Aggregate the approximate count of distinct users and alias with "active_users"
- Select **`traffic_source`**, **`active_users`**, and the **`hour`** extracted from **`window.start`** with alias "hour"
- Sort by **`hour`**

Assign resulting DataFrame to **`trafficDF`**.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

from pyspark.sql.functions import approx_count_distinct, hour, window

trafficDF = (eventsDF.groupBy("traffic_source", window(col("createdAt"), "1 hour")).agg(
    approx_count_distinct("user_id").alias("active_users"))
  .select(col("traffic_source"), col("active_users"), hour(col("window.start")).alias("hour"))
  .sort("hour")
)

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

In [0]:
assert str(trafficDF.schema) == "StructType(List(StructField(traffic_source,StringType,true),StructField(active_users,LongType,false),StructField(hour,IntegerType,true)))"

### 3. Execute query with display() and plot results
- Execute results for **`trafficDF`** using display()
  - Set the **`streamName`** parameter to set a name for the query
- Plot the streaming query results as a bar graph
- Configure the following plot options:
  - Keys: **`hour`**
  - Series groupings: **`traffic_source`**
  - Values: **`active_users`**

In [0]:
display(trafficDF)

traffic_source,active_users,hour
facebook,586,2
email,583,2
google,1171,2
youtube,205,2
instagram,382,2
direct,298,2
direct,106,5
facebook,226,5
google,414,5
instagram,142,5


-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work
- The bar chart should plot `hour` on the x-axis and `active_users` on the y-axis
- Six bars should appear at every hour for all traffic sources
- The chart should stop at hour 23

### 4. Manage streaming query
- Iterate over SparkSession's list of active streams to find one with name "hourly_traffic"
- Stop the streaming query

In [0]:
untilStreamIsReady("hourly_traffic_p")

for s in spark.streams.active:
  if s.name == "hourly_traffic_p":
    s.stop()

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work
- Print all active streams to check "hourly_traffic" is no longer there

### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
%run ./Includes/Classroom-Cleanup
