-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Aggregating Streams

##### Objectives
1. Add watermarking
1. Aggregate with windows
1. Display streaming query results
1. Monitor streaming queries

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery" target="_blank">StreamingQuery</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQueryManager.html#pyspark.sql.streaming.StreamingQueryManager" target="_blank">StreamingQueryManager</a>

## Hourly Activity by Traffic Lab
Process streaming data to display the total active users by traffic source with a 1 hour window.
1. Cast to timestamp and add watermark for 2 hours
2. Aggregate active users by traffic source for 1 hour windows
3. Execute query with `display` and plot results
5. Use query name to stop streaming query

### Setup
Run the cells below to generate hourly JSON files of event data for July 3, 2020.

In [0]:
%run ./Includes/Classroom-Setup

In [0]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

# Directory of hourly events logged from the BedBricks website on July 3, 2020
hourlyEventsPath = "/mnt/training/ecommerce/events/events-2020-07-03.json"

df = (spark
      .readStream
      .schema(schema)
      .option("maxFilesPerTrigger", 1)
      .json(hourlyEventsPath)
     )

### 1. Cast to timestamp and add watermark for 2 hours
- Add a **`createdAt`** column by dividing **`event_timestamp`** by 1M and casting to timestamp
- Set a watermark of 2 hours on the **`createdAt`** column

Assign the resulting DataFrame to **`eventsDF`**.

In [0]:
# TODO
eventsDF = (df.FILL_IN
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-3252360928643669>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# TODO[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m eventsDF = (df.FILL_IN
[0m[1;32m      3[0m )

[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36m__getattr__[0;34m(self, name)[0m
[1;32m   1664[0m         """
[1;32m   1665[0m         [0;32mif[0m [0mname[0m [0;32mnot[0m [0;32min[0m [0mself[0m[0;34m.[0m[0mcolumns[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1666[0;31m             raise AttributeError(
[0m[1;32m   1667[0m                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
[1;32m   1668[0m         [0mjc[0m [0;34m=[0m [0mself[0m[0;34m.[0m[0m_jdf[0m[0;34m.[0m[0mapply[0m[0;34m([0m[0mname[0m[0;34m)[0m

**CHECK YOUR WORK**

In [0]:
assert "StructField(createdAt,TimestampType,true" in str(eventsDF.schema)



### 2. Aggregate active users by traffic source for 1 hour windows
- Set the default shuffle partitions to the number of cores on your cluster (not required, but runs faster)
- Group by **`traffic_source`** with 1-hour tumbling windows based on the **`createdAt`** column
- Aggregate the approximate count of distinct users per **`traffic_source`** and hour, and alias the column to "active_users"
- Select **`traffic_source`**, **`active_users`**, and the **`hour`** extracted from **`window.start`** with an alias of "hour"
- Sort by **`hour`** in ascending order

Assign the resulting DataFrame to **`trafficDF`**.

In [0]:
# TODO
spark.FILL_IN

trafficDF = (eventsDF.FILL_IN
)



**CHECK YOUR WORK**

In [0]:
assert str(trafficDF.schema) == "StructType(List(StructField(traffic_source,StringType,true),StructField(active_users,LongType,false),StructField(hour,IntegerType,true)))"



### 3. Execute query with display() and plot results
- Use `display` to start **`trafficDF`** as a streaming query and display the resulting memory sink
  - Assign "hourly_traffic" as the name of the query by seting the **`streamName`** parameter of `display`
- Plot the streaming query results as a bar graph
- Configure the following plot options:
  - Keys: **`hour`**
  - Series groupings: **`traffic_source`**
  - Values: **`active_users`**

In [0]:
# TODO



**CHECK YOUR WORK**

- The bar chart should plot `hour` on the x-axis and `active_users` on the y-axis
- Six bars should appear at every hour for all traffic sources
- The chart should stop at hour 23

### 4. Manage streaming query
- Iterate over SparkSession's list of active streams to find one with name "hourly_traffic"
- Stop the streaming query

In [0]:
# TODO
untilStreamIsReady("hourly_traffic")

for s in FILL_IN:



%md **CHECK YOUR WORK**  
Print all active streams to check that "hourly_traffic" is no longer there

In [0]:
for s in spark.streams.active:
    print(s.name)



### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
%run ./Includes/Classroom-Cleanup



-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>