-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Aggregating Streams

##### Objectives
1. Add watermarking
1. Aggregate with windows
1. Display streaming query results
1. Monitor streaming queries

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery" target="_blank">StreamingQuery</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQueryManager.html#pyspark.sql.streaming.StreamingQueryManager" target="_blank">StreamingQueryManager</a>

## Hourly Activity by Traffic Lab
Process streaming data to display the total active users by traffic source with a 1 hour window.
1. Cast to timestamp and add watermark for 2 hours
2. Aggregate active users by traffic source for 1 hour windows
3. Execute query with `display` and plot results
5. Use query name to stop streaming query

### Setup
Run the cells below to generate hourly JSON files of event data for July 3, 2020.

In [0]:
%run ./Includes/Classroom-Setup

In [0]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

# Directory of hourly events logged from the BedBricks website on July 3, 2020
hourlyEventsPath = "/mnt/training/ecommerce/events/events-2020-07-03.json"

df = (spark
      .readStream
      .schema(schema)
      .option("maxFilesPerTrigger", 1)
      .json(hourlyEventsPath)
     )

### 1. Cast to timestamp and add watermark for 2 hours
- Add a **`createdAt`** column by dividing **`event_timestamp`** by 1M and casting to timestamp
- Set a watermark of 2 hours on the **`createdAt`** column

Assign the resulting DataFrame to **`eventsDF`**.

In [0]:
# TODO
from pyspark.sql.functions import col
eventsDF = (df
            .withColumn('createdAt',(col('event_timestamp')/1e6).cast('timestamp'))
            .withWatermark('createdAt','2 hours')
          
)
display(eventsDF)

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id,hour,createdAt
iOS,"List(1615.5, 1, 1)",finalize,1593733836357636.0,1593735143475431,"List(Eugene, OR)","List(List(NEWBED10, M_PREM_Q, Premium Queen Mattress, 1615.5, 1795.0, 1))",email,1593431170899022,UA000000106003703,0,2020-07-03T00:12:23.475+0000
Android,"List(940.5, 1, 1)",finalize,1593736720091314.0,1593736734032158,"List(Kansas City, MO)","List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))",email,1593425175505998,UA000000105991606,0,2020-07-03T00:38:54.032+0000
Windows,"List(null, null, null)",checkout,1593731861739147.0,1593734681675107,"List(Los Angeles, CA)","List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))",email,1593424443410761,UA000000105990606,0,2020-07-03T00:04:41.675+0000
iOS,"List(null, null, null)",cart,1593733836035173.0,1593734931695792,"List(Rowlett, TX)","List(List(NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1))",email,1593441730333613,UA000000106044333,0,2020-07-03T00:08:51.695+0000
iOS,"List(null, null, null)",cc_info,1593737536883746.0,1593737795419270,"List(Dallas, TX)","List(List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1))",email,1593436458901889,UA000000106020795,0,2020-07-03T00:56:35.419+0000
Android,"List(null, null, null)",cc_info,1593734624482193.0,1593734655037967,"List(Wabash, IN)","List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))",email,1593443624996319,UA000000106054173,0,2020-07-03T00:04:15.037+0000
Windows,"List(null, null, null)",add_item,1593734314243055.0,1593736762864443,"List(Youngstown, OH)","List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))",email,1593435017636944,UA000000106015482,0,2020-07-03T00:39:22.864+0000
macOS,"List(940.5, 1, 1)",finalize,1593734767233329.0,1593735118619771,"List(Sherwood, AR)","List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))",email,1593408477302567,UA000000105981664,0,2020-07-03T00:11:58.619+0000
iOS,"List(null, null, null)",guest,1593735242455741.0,1593735741968865,"List(Springdale, AR)","List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))",email,1593389940998817,UA000000105970964,0,2020-07-03T00:22:21.968+0000
Chrome OS,"List(null, null, null)",mattresses,1593441828190967.0,1593734586567337,"List(Fort Worth, TX)",List(),email,1593440605255323,UA000000106038926,0,2020-07-03T00:03:06.567+0000


**CHECK YOUR WORK**

In [0]:
assert "StructField(createdAt,TimestampType,true" in str(eventsDF.schema)

### 2. Aggregate active users by traffic source for 1 hour windows
- Set the default shuffle partitions to the number of cores on your cluster (not required, but runs faster)
- Group by **`traffic_source`** with 1-hour tumbling windows based on the **`createdAt`** column
- Aggregate the approximate count of distinct users per **`traffic_source`** and hour, and alias the column to "active_users"
- Select **`traffic_source`**, **`active_users`**, and the **`hour`** extracted from **`window.start`** with an alias of "hour"
- Sort by **`hour`** in ascending order

Assign the resulting DataFrame to **`trafficDF`**.

In [0]:
# TODO
from pyspark.sql.functions import approx_count_distinct, hour, window

spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)

trafficDF = (eventsDF
             .groupBy("traffic_source", window(col("createdAt"), "1 hour"))
             .agg(approx_count_distinct("user_id").alias("active_users"))
             .select(col("traffic_source"), col("active_users"), hour(col("window.start")).alias("hour"))
             .sort('hour')
)
display(trafficDF)

traffic_source,active_users,hour
youtube,479,0
instagram,907,0
google,2523,0
email,787,0
direct,806,0
facebook,1503,0
direct,430,1
google,1852,1
facebook,868,1
email,739,1


**CHECK YOUR WORK**

In [0]:
assert str(trafficDF.schema) == "StructType(List(StructField(traffic_source,StringType,true),StructField(active_users,LongType,false),StructField(hour,IntegerType,true)))"

### 3. Execute query with display() and plot results
- Use `display` to start **`trafficDF`** as a streaming query and display the resulting memory sink
  - Assign "hourly_traffic" as the name of the query by seting the **`streamName`** parameter of `display`
- Plot the streaming query results as a bar graph
- Configure the following plot options:
  - Keys: **`hour`**
  - Series groupings: **`traffic_source`**
  - Values: **`active_users`**

In [0]:
# TODO
display(trafficDF,streamName='hourly_traffic')

traffic_source,active_users,hour
youtube,479,0
instagram,907,0
google,2523,0
email,787,0
direct,806,0
facebook,1503,0
direct,430,1
google,1852,1
facebook,868,1
email,739,1


**CHECK YOUR WORK**

- The bar chart should plot `hour` on the x-axis and `active_users` on the y-axis
- Six bars should appear at every hour for all traffic sources
- The chart should stop at hour 23

### 4. Manage streaming query
- Iterate over SparkSession's list of active streams to find one with name "hourly_traffic"
- Stop the streaming query

In [0]:
# TODO
untilStreamIsReady("hourly_traffic")

for s in spark.streams.active:
   if s.name == "hourly_traffic":
        s.stop()

%md **CHECK YOUR WORK**  
Print all active streams to check that "hourly_traffic" is no longer there

In [0]:
for s in spark.streams.active:
    print(s.name)

### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
%run ./Includes/Classroom-Cleanup

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>