### Working with Time Windows
In this notebook, we'll explore the problems associated with trying to aggregate streaming data, and then illustrate how to solve this problem using structures called windows, and by expiring old data using watermarking.

#### Objectives:
* Use sliding windows to aggregate over chunks of data rather than all data
* Apply watermarking to throw away stale old data that you do not have space to keep
* Plot live graphs using `display`

First, run the following cell to import the data and make various utilities available for our experimentation.

In [0]:
%run "./Includes/Classroom-Setup"

### Streaming Aggregations
Continuous applications often require near real-time decisions on real-time, aggregated statistics.

Some examples include: 
* Aggregating errors by type in data from IoT devices. 
* Detecting anomalous behavior by aggregating data by country from a server's log file(s). 
* Performing behavioral analysis on instant messages via hash tags.

However, in the case of streams, you generally don't want to run aggregations over the entire dataset. 
<br>

**But what happens if an attempt is made to aggregate over a stream's entire dataset?** <br>
- While streams have a definitive start, conceptually there's no end to a stream; i.e., it's an [unbounded] data set.
- Because there is no "end" to a stream, the size of the dataset grows in perpetuity.
- Which means that your cluster would eventually run out of resources.

So, instead of aggregating over an entire dataset, streaming data must be aggregated over by grouping the data by *windows* of time (e.g.,, every 5 minutes, or every hour).  This technical approach is referred to as **windowing**.

<br>

### Windowing
If we were using a static DataFrame to produce an aggregate count, we could use `groupBy()` and `count()`.  However, we must insteadn accumulate counts within a **sliding window** in order to answer questions such as *"How many records are we getting every second?"*

The following illustration, from the <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html" target="_blank">Structured Streaming Programming Guide</a> guide, helps us understanding how it works:

<img src="http://spark.apache.org/docs/latest/img/structured-streaming-window.png" style="width: 900px;">

<br>

#### Event Time *versus* Receipt Time
- **Event Time** is the time at which the event occurred in the real world.
- **Event Time** is **NOT** maintained by the Structured Streaming framework. 

At best, Streams only knows about **Receipt Time** - the time a piece of data arrived in Spark.

##### Examples of *Event Time*:
* The timestamp recorded in each record of a log file
* The instant at which an IoT device took a measurement
* The moment a REST API received a request

##### Examples of Receipt Time:
- A timestamp added to a DataFrame the moment it was processed by Spark
- The timestamp extracted from an hourly log file's file name
- The time at which an IoT hub received a report of a device's measurement
- Presumably offset by some delay from when the measurement was taken

However, it should be born in mind that there are some problems inherent to using the **Receipt Time...** the main problem pertaining to accuracy.

For example, the time between when an IoT device takes a measurement versus when it is reported can be off by several minutes.  This may be of significant concern with regards to security and health devices. For example:
- The timestamp embedded in an hourly log file can be off by up to one hour making correlations to other events extremely difficult
- The timestamp added by Spark as part of a DataFrame transformation can be off by hours to weeks to months depending on when the event occurred and when the job ran

So then, are there situations where using **Receipt Time** rather than **Event Time** may be more appropriate?  Well, it depends... Receipt Time could be appropriate in circumstances where accuracy is not a significant concern (i.e., when **Receipt Time** is close enough to **Event Time**).
For example, where IoT events that could be delayed by minutes, but where the resolution of the query is greater (e.g., days or months).

#### 1.0. Windowed Streaming
This exercise examines the files in `/mnt/training/sensor-data/accelerometer/time-series-stream.json/`.  Each line in this file contains a JSON record having two fields: `time` and `action`. Consider that new files will be written to this directory continuously (aka, streaming); therefore, conceptually there is no end to this process.

###### 1.1. First, inspect the `head` of one of these files:

In [0]:
%fs head dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/file-0.json

###### 1.2. Next, analyze these files interactively. 
A schema must be specified for file-based Structured Streams; therefore, the first task is to define a schema for these files.
Due to its simplicity, the schema can be defined using a simple DDL-formatted string representation of the schema.

In [0]:
inputPath = "dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/"
jsonSchema = "time timestamp, action string"

###### 1.3. With the schema defined, create the initial DataFrame `inputDf`, and then `countsDF` to represent the aggregation:

In [0]:
from pyspark.sql.functions import window, col

inputDF = (spark
  .readStream                                 # Returns an instance of DataStreamReader
  .schema(jsonSchema)                         # Set the schema of the JSON data
  .option("maxFilesPerTrigger", 1)            # Treat a sequence of files as a stream, one file at a time
  .json(inputPath)                            # Specifies the format, path and returns a DataFrame
)

countsDF = (inputDF
  .groupBy(col("action"),                     # Aggregate by action...
           window(col("time"), "1 hour"))     # ...then by a 1 hour window
  .count()                                    # For the aggregate, produce a count
  .select(col("window.start").alias("start"), # Elevate field to column
          col("count"),                       # Include count
          col("action"))                      # Include action
  .orderBy(col("start"))                      # Sort by the start time
)

###### 1.4. To view the results of the query, pass the DataFrame `countsDF` to the `display()` function.
As in the previous lesson, specify the stream's `streamName` to gain better control of it.

In [0]:
streamName = "lesson03_ps"
display(countsDF,  streamName = streamName)

#### 2.0. Performance Considerations
If you run that query, as is, it will take a surprisingly long time to start generating data. What's the cause of the delay? If you expand the **Spark Jobs** component, you'll see something like this:

<img src="https://files.training.databricks.com/images/structured-streaming-shuffle-partitions-200.png"/>

<br>

It's our `groupBy()`. `groupBy()` causes a _shuffle_, and, by default, Spark SQL shuffles to 200 partitions. In addition, we're doing a _stateful_ aggregation: one that requires Structured Streaming to maintain and aggregate data over time.

When doing a stateful aggregation, Structured Streaming must maintain an in-memory _state map_ for each window within each partition. For fault tolerance reasons, the state map has to be saved after a partition is processed, and it needs to be saved somewhere fault-tolerant. To meet those requirements, the Streaming API saves the maps to a distributed store. On some clusters, that will be HDFS. Databricks uses the DBFS.

That means that every time it finishes processing a window, the Streaming API writes its internal map to disk. The write has some overhead, typically between 1 and 2 seconds.

In [0]:
untilStreamIsReady(streamName)

Before proceeding, we need to stop any streams

In [0]:
# for s in spark.streams.active: # Iterate over all active streams
#   s.stop()                     # Stop the stream

# As mentioned in lesson #2, we have provided additional methods for working with streams, and in  
# this case, for dealing with the rare exceptions that may arise as a result of terminating a stream.
# Listed above is the logical equivalent to this operation.
stopAllStreams()

One way to reduce this overhead is to reduce the number of partitions Spark shuffles to. In most cases, you want a 1-to-1 mapping of partitions to cores for streaming applications.  Rerun the query below and notice the performance improvement.

Once the data is loaded, render a line graph with 
* **Keys** is set to `start`
* **Series groupings** is set to `action`
* **Values** is set to `count`

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

display(countsDF,  streamName = streamName)

Wait until stream is done initializing...

In [0]:
untilStreamIsReady(streamName)

When you are done, stop all the streaming jobs.

In [0]:
stopAllStreams()

##### 2.1. Problem with Generating Many Windows
We are generating a window for every 1 hour aggregate.  _Every window_ has to be separately persisted and maintained. Over time, this aggregated data will build up in the driver. The end result being a massive slowdown if not an OOM Error.

###### How do we fix that problem?
One simple solution is to increase the size of our window (say, to 2 hours). That way, we're generating fewer windows.
But if the job runs for a long time, we're still building up an unbounded set of windows. Eventually, we could hit resource limits.

<br>

#### 3.0. Watermarking
A better solution to the problem is to define a cut-off (i.e., a point after which Structured Streaming is allowed to throw saved windows away). That's what _watermarking_ allows us to do.  Below is our previous example with watermarking. Structured Streaming is instructed to keep no more than 2 hours of aggregated data.

In [0]:
watermarkedDF = (inputDF
  .withWatermark("time", "2 hours")             # Specify a 2-hour watermark
  .groupBy(col("action"),                       # Aggregate by action...
           window(col("time"), "1 hour"))       # ...then by a 1 hour window
  .count()                                      # For each aggregate, produce a count
  .select(col("window.start").alias("start"),   # Elevate field to column
          col("count"),                         # Include count
          col("action"))                        # Include action
  .orderBy(col("start"))                        # Sort by the start time
)
display(watermarkedDF, streamName = streamName) # Start the stream and display it

In the example above,   
* Data received 2 hour _past_ the watermark will be dropped. 
* Data received within 2 hours of the watermark will never be dropped.

More specifically, any data less than 2 hours behind the latest data processed till then is guaranteed to be aggregated. However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated. The more delayed the data is, the less likely the engine is going to process it.

**Wait until stream is done initializing...**

In [0]:
untilStreamIsReady(streamName)

Stop all the streams

In [0]:
stopAllStreams()

Run the **`Classroom-Cleanup`** cell below to remove any artifacts created by this lesson.

In [0]:
%run "./Includes/Classroom-Cleanup"