-sandbox
<img src="https://files.training.databricks.com/images/Apache-Spark-Logo_TM_200px.png" style="float: left: margin: 20px"/>

# Working with Time Windows

## In this lesson you:
* Use sliding windows to aggregate over chunks of data rather than all data
* Apply watermarking to throw away stale old data that you do not have space to keep
* Plot live graphs using `display`

## Audience
* Primary Audience: Data Engineers
* Secondary Audience: Data Scientists, Software Engineers

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Getting Started</h2>

Run the following cell to configure our "classroom."

In [0]:
%run "./Includes/Classroom-Setup"

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Streaming Aggregations</h2>

Continuous applications often require near real-time decisions on real-time, aggregated statistics.

Some examples include
* Aggregating errors in data from IoT devices by type
* Detecting anomalous behavior in a server's log file by aggregating by country.
* Doing behavior analysis on instant messages via hash tags.

However, in the case of streams, you generally don't want to run aggregations over the entire dataset.

### What problems might you encounter if you aggregate over a stream's entire dataset?

While streams have a definitive start, there conceptually is no end to the flow of data.

Because there is no "end" to a stream, the size of the dataset grows in perpetuity.

This means that your cluster will eventually run out of resources.

Instead of aggregating over the entire dataset, you can aggregate over data grouped by windows of time (say, every 5 minutes or every hour).

This is referred to as windowing

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Windowing</h2>

If we were using a static DataFrame to produce an aggregate count, we could use `groupBy()` and `count()`.

Instead we accumulate counts within a sliding window, answering questions like "How many records are we getting every second?"

**Sliding windows** 

The windows overlap and a single event may be aggregated into multiple windows. 

**Tumbling Windows**

The windows do not overlap and a single event will be aggregated into only one window. 

The diagram below shows sliding windows. 

The following illustration, from the <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html" target="_blank">Structured Streaming Programming Guide</a> guide, helps us understanding how it works:

<img src="http://spark.apache.org/docs/latest/img/structured-streaming-window.png">

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Event Time vs Receipt Time</h2>

**Event Time** is the time at which the event occurred in the real world.

**Event Time** is **NOT** something maintained by the Structured Streaming framework.

At best, Structured Streaming only knows about **Receipt Time** - the time a piece of data arrived in Spark.

### What are some examples of **Event Time**? **of Receipt Time**?

#### Examples of *Event Time*:
* The timestamp recorded in each record of a log file
* The instant at which an IoT device took a measurement
* The moment a REST API received a request

#### Examples of *Receipt Time*:
* A timestamp added to a DataFrame the moment it was processed by Spark
* The timestamp extracted from an hourly log file's file name
* The time at which an IoT hub received a report of a device's measurement
  - Presumably offset by some delay from when the measurement was taken

### What are some of the inherent problems with using **Receipt Time**?

The main problem with using **Receipt Time** is going to be with accuracy. For example:

* The time between when an IoT device takes a measurement vs when it is reported can be off by several minutes.
  - This could have significant ramifications to security and health devices, for example
* The timestamp embedded in an hourly log file can be off by up to one hour making correlations to other events extremely difficult
* The timestamp added by Spark as part of a DataFrame transformation can be off by hours to weeks to months depending on when the event occurred and when the job ran

### When might it be OK to use **Receipt Time** instead of **Event Time**?

When accuracy is not a significant concern - that is **Receipt Time** is close enough to **Event Time**

One example would be for IoT events that can be delayed by minutes but the resolution of your query is by days or months (close enough)

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Windowed Streaming Example</h2>

For this example, we will examine the files in `/mnt/training/sensor-data/accelerometer/time-series-stream.json/`.

Each line in the file contains a JSON record with two fields: `time` and `action`

New files are being written to this directory continuously (aka streaming).

Theoretically, there is no end to this process.

Let's start by looking at the head of one such file:

In [0]:
%fs head dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/file-0.json

-sandbox

### Define the Schema for the streaming content

Let's try to analyze these files interactively.

First configure a schema.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> The schema must be specified for file-based Structured Streams.
Because of the simplicity of the schema, we can use the simpler, DDL-formatted, string representation of the schema.

In [0]:
inputPath = "dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/"

jsonSchema = "time timestamp, action string"

### Define a streaming Dataframe

With the schema defined, we can create the initial DataFrame `inputDf` and then `countsDF` which represents our aggregation:

In [0]:
from pyspark.sql.functions import window, col

inputDF = (spark
  .readStream                                 # Returns an instance of DataStreamReader
  .schema(jsonSchema)                         # Set the schema of the JSON data
  .option("maxFilesPerTrigger", 1)            # Treat a sequence of files as a stream, one file at a time
  .json(inputPath)                            # Specifies the format, path and returns a DataFrame
)

countsDF = (inputDF
  .groupBy(col("action"),                     # Aggregate by action...
           window(col("time"), "1 hour"))     # ...then by a 1 hour window
  .count()                                    # For the aggregate, produce a count
  .select(col("window.start").alias("start"), # Elevate field to column
          col("action"),                      # Include count
          col("count"))                       # Include action
  .orderBy(col("start"), col("action"))       # Sort by the start time and action
)

### View Results

To view the results of our query, pass the DataFrame `countsDF` to the `display()` function.

In [0]:
display(countsDF)

start,action,count
2016-07-26T02:00:00.000+0000,Close,11
2016-07-26T02:00:00.000+0000,Open,179
2016-07-26T03:00:00.000+0000,Close,344
2016-07-26T03:00:00.000+0000,Open,1001
2016-07-26T04:00:00.000+0000,Close,815
2016-07-26T04:00:00.000+0000,Open,999
2016-07-26T05:00:00.000+0000,Close,323
2016-07-26T05:00:00.000+0000,Open,328
2016-07-26T13:00:00.000+0000,Close,699
2016-07-26T13:00:00.000+0000,Open,656


### Performance Considerations

If you run that query, as is, it will take a surprisingly long time to start generating data. What's the cause of the delay?

If you expand the **Spark Jobs** component, you'll see something like this:

<img src="https://files.training.databricks.com/images/structured-streaming-shuffle-partitions-200.png"/>

It's our `groupBy()`. `groupBy()` causes a _shuffle_, and, by default, Spark SQL shuffles to 200 partitions. In addition, we're doing a _stateful_ aggregation: one that requires Structured Streaming to maintain and aggregate data over time.

When doing a stateful aggregation, Structured Streaming must maintain an in-memory _state map_ for each window within each partition. For fault tolerance reasons, the state map has to be saved after a partition is processed, and it needs to be saved somewhere fault-tolerant. To meet those requirements, the Streaming API saves the maps to a distributed store. On some clusters, that will be HDFS. Databricks uses the DBFS.

That means that every time it finishes processing a window, the Streaming API writes its internal map to disk. The write has some overhead, typically between 1 and 2 seconds.

### What's the cause of the delay?
* `groupBy()` causes a **shuffle**
* By default, this produces **200 partitions**
* Plus a **stateful aggregation** to be maintained **over time**

This results in :
* Maintenance of an **in-memory state map** for **each window** within **each partition**
* Writing of the state map to a fault-tolerant store
  * On some clusters, that will be HDFS
  * Databricks uses the DBFS
* Around 1 to 2 seconds overhead

### Shuffle Partition Best Practices

One way to reduce this overhead is to reduce the number of partitions Spark shuffles to.

In most cases, you want a 1-to-1 mapping of partitions to cores for streaming applications.

### Run query with proper setting for shuffle partitions

Rerun the query below and notice the performance improvement.

Once the data is loaded, render a line graph with
* **Keys** is set to `start`
* **Series groupings** is set to `action`
* **Values** is set to `count`

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

display(countsDF)

start,action,count
2016-07-26T02:00:00.000+0000,Close,11
2016-07-26T02:00:00.000+0000,Open,179
2016-07-26T03:00:00.000+0000,Close,344
2016-07-26T03:00:00.000+0000,Open,1001
2016-07-26T04:00:00.000+0000,Close,815
2016-07-26T04:00:00.000+0000,Open,999
2016-07-26T05:00:00.000+0000,Close,1003
2016-07-26T05:00:00.000+0000,Open,1000
2016-07-26T06:00:00.000+0000,Close,1011
2016-07-26T06:00:00.000+0000,Open,993


### Stop all Streams

When you are done, stop all the streaming jobs.

In [0]:
for s in spark.streams.active: # Iterate over all active streams
  s.stop()                     # Stop the stream

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Problem with Generating Many Windows</h2>

We are generating a window for every 1 hour aggregate.

_Every window_ has to be separately persisted and maintained.

Over time, this aggregated data will build up in the driver.

The end result being a massive slowdown if not an OOM Error.

### How do we fix that problem?

One simple solution is to increase the size of our window (say, to 2 hours).

That way, we're generating fewer windows.

But if the job runs for a long time, we're still building up an unbounded set of windows.

Eventually, we could hit resource limits.

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Watermarking</h2>

A better solution to the problem is to define a cut-off.

A point after which Structured Streaming will commit windowed data to sink, or throw it away if the sink is console or memory as `display()` mimics.

That's what _watermarking_ allows us to do.

### Refining our previous example

Below is our previous example with watermarking.

We're telling Structured Streaming to keep no more than 2 hours of aggregated data.

In [0]:
watermarkedDF = (inputDF
  .withWatermark("time", "2 hours")           # Specify a 2-hour watermark
  .groupBy(col("action"),                     # Aggregate by action...
           window(col("time"), "1 hour"))     # ...then by a 1 hour window
  .count()                                    # For each aggregate, produce a count
  .select(col("window.start").alias("start"), # Elevate field to column
          col("action"),                      # Include count
          col("count"))                       # Include action
  .orderBy(col("start"), col("action"))       # Sort by the start time
)
display(watermarkedDF)                        # Start the stream and display it

start,action,count
2016-07-26T02:00:00.000+0000,Close,11
2016-07-26T02:00:00.000+0000,Open,179
2016-07-26T03:00:00.000+0000,Close,344
2016-07-26T03:00:00.000+0000,Open,1001
2016-07-26T04:00:00.000+0000,Close,815
2016-07-26T04:00:00.000+0000,Open,999
2016-07-26T05:00:00.000+0000,Close,323
2016-07-26T05:00:00.000+0000,Open,328
2016-07-26T13:00:00.000+0000,Close,699
2016-07-26T13:00:00.000+0000,Open,656


### Example Details

In the example above,
* Data received 2 hour _past_ the watermark will be dropped.
* Data received within 2 hours of the watermark will never be dropped.

More specifically, any data less than 2 hours behind the latest data processed till then is guaranteed to be aggregated.

However, the guarantee is strict only in one direction.

Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated.

The more delayed the data is, the less likely the engine is going to process it.

### Stop all the streams

In [0]:
for s in spark.streams.active: # Iterate over all active streams
  s.stop()                     # Stop the stream


## Next steps

Start the next lesson, [Structured Streaming with Azure EventHubs]($./3.Streaming-With-Event-Hubs-Demo)