d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

-sandbox
<img src="https://files.training.databricks.com/images/Apache-Spark-Logo_TM_200px.png" style="float: left: margin: 20px"/>

# Working with Time Windows

## In this lesson you:
* Use sliding windows to aggregate over chunks of data rather than all data
* Apply watermarking to throw away stale old data that you do not have space to keep
* Plot live graphs using `display`

## Audience
* Primary Audience: Data Engineers
* Secondary Audience: Data Scientists, Software Engineers

## Prerequisites
* Web browser: current versions of Google Chrome, Firefox, Safari, Microsoft Edge and 
Internet Explorer 11 on Windows 7, 8, or 10 (see <a href="https://docs.databricks.com/user-guide/supported-browsers.html#supported-browsers#" target="_blank">Supported Web Browsers</a>)
* Databricks Runtime 4.2 or greater
* Completed courses DataFrames, ETL-Parts 1, 2 and 3 from <a href="https://academy.databricks.com/" target="_blank">Databricks Academy</a>, or have similar knowledge

<iframe  
src="//fast.wistia.net/embed/iframe/uiwie12hng?videoFoam=true"
style="border:1px solid #1cb1c2;"
allowtransparency="true" scrolling="no" class="wistia_embed"
name="wistia_embed" allowfullscreen mozallowfullscreen webkitallowfullscreen
oallowfullscreen msallowfullscreen width="640" height="360" ></iframe>
<div>
<a target="_blank" href="https://fast.wistia.net/embed/iframe/uiwie12hng?seo=false">
  <img alt="Opens in new tab" src="https://files.training.databricks.com/static/images/external-link-icon-16x16.png"/>&nbsp;Watch full-screen.</a>
</div>

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Getting Started</h2>

Run the following cell to configure our "classroom."

In [5]:
%run "./Includes/Classroom-Setup"

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Streaming Aggregations</h2>

Continuous applications often require near real-time decisions on real-time, aggregated statistics.

Some examples include 
* Aggregating errors in data from IoT devices by type 
* Detecting anomalous behavior in a server's log file by aggregating by country. 
* Doing behavior analysis on instant messages via hash tags.

However, in the case of streams, you generally don't want to run aggregations over the entire dataset.

### What problems might you encounter if you aggregate over a stream's entire dataset?

-sandbox

<script type="text/javascript">
  window.onload = function() {
    var allHints = document.getElementsByClassName("hint-1068");
    var answer = document.getElementById("answer-1068");
    var totalHints = allHints.length;
    var nextHint = 0;
    var hasAnswer = (answer != null);
    var items = new Array();
    var answerLabel = "Click here for the answer";
    for (var i = 0; i < totalHints; i++) {
      var elem = allHints[i];
      var label = "";
      if ((i + 1) == totalHints)
        label = answerLabel;
      else
        label = "Click here for the next hint";
      items.push({label: label, elem: elem});
    }
    if (hasAnswer) {
      items.push({label: '', elem: answer});
    }

    var button = document.getElementById("hint-button-1068");
    if (totalHints == 0) {
      button.innerHTML = answerLabel;
    }
    button.onclick = function() {
      items[nextHint].elem.style.display = 'block';
      if ((nextHint + 1) >= items.length)
        button.style.display = 'none';
      else
        button.innerHTML = items[nextHint].label;
        nextHint += 1;
    };
    button.ondblclick = function(e) {
      e.stopPropagation();
    }
    var answerCodeBlocks = document.getElementsByTagName("code");
    for (var i = 0; i < answerCodeBlocks.length; i++) {
      var elem = answerCodeBlocks[i];
      var parent = elem.parentNode;
      if (parent.name != "pre") {
        var newNode = document.createElement("pre");
        newNode.append(elem.cloneNode(true));
        elem.replaceWith(newNode);
        elem = newNode;
      }
      elem.ondblclick = function(e) {
        e.stopPropagation();
      };

      elem.style.marginTop = "1em";
    }
  };
</script>

<div>
  <button type="button" class="btn btn-light"
          style="margin-top: 1em"
          id="hint-button-1068">Click here for a hint</button>
</div>
<div id="answer-1068" style="padding-bottom: 20px; display: none">
  The answer:
  <div class="answer" style="margin-left: 1em">
While streams have a definitive start, there conceptually is no end to the flow of data.

Because there is no &quot;end&quot; to a stream, the size of the dataset grows in perpetuity.

This means that your cluster will eventually run out of resources.

Instead of aggregating over the entire dataset, you can aggregate over data grouped by windows of time (say, every 5 minutes or every hour).

This is referred to as windowing
  </div>
</div>

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Windowing</h2>

If we were using a static DataFrame to produce an aggregate count, we could use `groupBy()` and `count()`. 

Instead we accumulate counts within a sliding window, answering questions like "How many records are we getting every second?"

The following illustration, from the <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html" target="_blank">Structured Streaming Programming Guide</a> guide, helps us understanding how it works:

<img src="http://spark.apache.org/docs/latest/img/structured-streaming-window.png">

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Event Time vs Receipt Time</h2>

**Event Time** is the time at which the event occurred in the real world.

**Event Time** is **NOT** something maintained by the Structured Streaming framework. 

At best, Streams only knows about **Receipt Time** - the time a piece of data arrived in Spark.

### What are some examples of **Event Time**? **of Receipt Time**?

-sandbox

<script type="text/javascript">
  window.onload = function() {
    var allHints = document.getElementsByClassName("hint-2254");
    var answer = document.getElementById("answer-2254");
    var totalHints = allHints.length;
    var nextHint = 0;
    var hasAnswer = (answer != null);
    var items = new Array();
    var answerLabel = "Click here for the answer";
    for (var i = 0; i < totalHints; i++) {
      var elem = allHints[i];
      var label = "";
      if ((i + 1) == totalHints)
        label = answerLabel;
      else
        label = "Click here for the next hint";
      items.push({label: label, elem: elem});
    }
    if (hasAnswer) {
      items.push({label: '', elem: answer});
    }

    var button = document.getElementById("hint-button-2254");
    if (totalHints == 0) {
      button.innerHTML = answerLabel;
    }
    button.onclick = function() {
      items[nextHint].elem.style.display = 'block';
      if ((nextHint + 1) >= items.length)
        button.style.display = 'none';
      else
        button.innerHTML = items[nextHint].label;
        nextHint += 1;
    };
    button.ondblclick = function(e) {
      e.stopPropagation();
    }
    var answerCodeBlocks = document.getElementsByTagName("code");
    for (var i = 0; i < answerCodeBlocks.length; i++) {
      var elem = answerCodeBlocks[i];
      var parent = elem.parentNode;
      if (parent.name != "pre") {
        var newNode = document.createElement("pre");
        newNode.append(elem.cloneNode(true));
        elem.replaceWith(newNode);
        elem = newNode;
      }
      elem.ondblclick = function(e) {
        e.stopPropagation();
      };

      elem.style.marginTop = "1em";
    }
  };
</script>

<div>
  <button type="button" class="btn btn-light"
          style="margin-top: 1em"
          id="hint-button-2254">Click here for a hint</button>
</div>
<div id="answer-2254" style="padding-bottom: 20px; display: none">
  The answer:
  <div class="answer" style="margin-left: 1em">
#### Examples of *Event Time*:
* The timestamp recorded in each record of a log file
* The instant at which an IoT device took a measurement
* The moment a REST API received a request

#### Examples of *Receipt Time*:
* A timestamp added to a DataFrame the moment it was processed by Spark
* The timestamp extracted from an hourly log file&#x27;s file name
* The time at which an IoT hub received a report of a device&#x27;s measurement 
  - Presumably offset by some delay from when the measurement was taken
  </div>
</div>

### What are some of the inherent problems with using **Receipt Time**?

-sandbox

<script type="text/javascript">
  window.onload = function() {
    var allHints = document.getElementsByClassName("hint-5511");
    var answer = document.getElementById("answer-5511");
    var totalHints = allHints.length;
    var nextHint = 0;
    var hasAnswer = (answer != null);
    var items = new Array();
    var answerLabel = "Click here for the answer";
    for (var i = 0; i < totalHints; i++) {
      var elem = allHints[i];
      var label = "";
      if ((i + 1) == totalHints)
        label = answerLabel;
      else
        label = "Click here for the next hint";
      items.push({label: label, elem: elem});
    }
    if (hasAnswer) {
      items.push({label: '', elem: answer});
    }

    var button = document.getElementById("hint-button-5511");
    if (totalHints == 0) {
      button.innerHTML = answerLabel;
    }
    button.onclick = function() {
      items[nextHint].elem.style.display = 'block';
      if ((nextHint + 1) >= items.length)
        button.style.display = 'none';
      else
        button.innerHTML = items[nextHint].label;
        nextHint += 1;
    };
    button.ondblclick = function(e) {
      e.stopPropagation();
    }
    var answerCodeBlocks = document.getElementsByTagName("code");
    for (var i = 0; i < answerCodeBlocks.length; i++) {
      var elem = answerCodeBlocks[i];
      var parent = elem.parentNode;
      if (parent.name != "pre") {
        var newNode = document.createElement("pre");
        newNode.append(elem.cloneNode(true));
        elem.replaceWith(newNode);
        elem = newNode;
      }
      elem.ondblclick = function(e) {
        e.stopPropagation();
      };

      elem.style.marginTop = "1em";
    }
  };
</script>

<div>
  <button type="button" class="btn btn-light"
          style="margin-top: 1em"
          id="hint-button-5511">Click here for a hint</button>
</div>
<div id="answer-5511" style="padding-bottom: 20px; display: none">
  The answer:
  <div class="answer" style="margin-left: 1em">
The main problem with using **Receipt Time** is going to be with accuracy. For example:

* The time between when an IoT device takes a measurement vs when it is reported can be off by several minutes. 
  - This could have significant ramifications to security and health devices, for example
* The timestamp embedded in an hourly log file can be off by up to one hour making correlations to other events extremely difficult
* The timestamp added by Spark as part of a DataFrame transformation can be off by hours to weeks to months depending on when the event occurred and when the job ran
  </div>
</div>

### When might it be OK to use **Receipt Time** instead of **Event Time**?

-sandbox

<script type="text/javascript">
  window.onload = function() {
    var allHints = document.getElementsByClassName("hint-1624");
    var answer = document.getElementById("answer-1624");
    var totalHints = allHints.length;
    var nextHint = 0;
    var hasAnswer = (answer != null);
    var items = new Array();
    var answerLabel = "Click here for the answer";
    for (var i = 0; i < totalHints; i++) {
      var elem = allHints[i];
      var label = "";
      if ((i + 1) == totalHints)
        label = answerLabel;
      else
        label = "Click here for the next hint";
      items.push({label: label, elem: elem});
    }
    if (hasAnswer) {
      items.push({label: '', elem: answer});
    }

    var button = document.getElementById("hint-button-1624");
    if (totalHints == 0) {
      button.innerHTML = answerLabel;
    }
    button.onclick = function() {
      items[nextHint].elem.style.display = 'block';
      if ((nextHint + 1) >= items.length)
        button.style.display = 'none';
      else
        button.innerHTML = items[nextHint].label;
        nextHint += 1;
    };
    button.ondblclick = function(e) {
      e.stopPropagation();
    }
    var answerCodeBlocks = document.getElementsByTagName("code");
    for (var i = 0; i < answerCodeBlocks.length; i++) {
      var elem = answerCodeBlocks[i];
      var parent = elem.parentNode;
      if (parent.name != "pre") {
        var newNode = document.createElement("pre");
        newNode.append(elem.cloneNode(true));
        elem.replaceWith(newNode);
        elem = newNode;
      }
      elem.ondblclick = function(e) {
        e.stopPropagation();
      };

      elem.style.marginTop = "1em";
    }
  };
</script>

<div>
  <button type="button" class="btn btn-light"
          style="margin-top: 1em"
          id="hint-button-1624">Click here for a hint</button>
</div>
<div id="answer-1624" style="padding-bottom: 20px; display: none">
  The answer:
  <div class="answer" style="margin-left: 1em">
When accuracy is not a significant concern - that is **Receipt Time** is close enough to **Event Time**

One example would be for IoT events that can be delayed by minutes but the resolution of your query is by days or months (close enough)
  </div>
</div>

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Windowed Streaming Example</h2>

For this example, we will examine the files in `/mnt/training/sensor-data/accelerometer/time-series-stream.json/`.

Each line in the file contains a JSON record with two fields: `time` and `action`

New files are being written to this directory continuously (aka streaming).

Theoretically, there is no end to this process.

Let's start by looking at the head of one such file:

In [19]:
%fs head dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/file-0.json

-sandbox
Let's try to analyze these files interactively. 

First configure a schema.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> The schema must be specified for file-based Structured Streams. 
Because of the simplicity of the schema, we can use the simpler, DDL-formatted, string representation of the schema.

In [21]:
inputPath = "dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/"

jsonSchema = "time timestamp, action string"

With the schema defined, we can create the initial DataFrame `inputDf` and then `countsDF` which represents our aggregation:

In [23]:
from pyspark.sql.functions import window, col

inputDF = (spark
  .readStream                                 # Returns an instance of DataStreamReader
  .schema(jsonSchema)                         # Set the schema of the JSON data
  .option("maxFilesPerTrigger", 1)            # Treat a sequence of files as a stream, one file at a time
  .json(inputPath)                            # Specifies the format, path and returns a DataFrame
)


countsDF = (inputDF
.groupBy(col("action"),window(col("time"),"1 hour"))                     # Aggregate by action...# ...then by a 1 hour window
.count()                                                                 # For the aggregate, produce a count
.select(col("window.start").alias("start"),col("count"),col("action"))   # Include count  # Include action # Elevate field to column
.orderBy(col("start"))                                                   # Sort by the start time





)

To view the results of our query, pass the DataFrame `countsDF` to the `display()` function.

In [25]:
display(countsDF)

start,count,action
2016-07-26T02:00:00.000+0000,179,Open
2016-07-26T02:00:00.000+0000,11,Close
2016-07-26T03:00:00.000+0000,344,Close
2016-07-26T03:00:00.000+0000,1001,Open
2016-07-26T04:00:00.000+0000,999,Open
2016-07-26T04:00:00.000+0000,815,Close
2016-07-26T05:00:00.000+0000,1000,Open
2016-07-26T05:00:00.000+0000,1003,Close
2016-07-26T06:00:00.000+0000,1011,Close
2016-07-26T06:00:00.000+0000,993,Open


### Performance Considerations

If you run that query, as is, it will take a surprisingly long time to start generating data. What's the cause of the delay? 

If you expand the **Spark Jobs** component, you'll see something like this:

<img src="https://files.training.databricks.com/images/structured-streaming-shuffle-partitions-200.png"/>

It's our `groupBy()`. `groupBy()` causes a _shuffle_, and, by default, Spark SQL shuffles to 200 partitions. In addition, we're doing a _stateful_ aggregation: one that requires Structured Streaming to maintain and aggregate data over time.

When doing a stateful aggregation, Structured Streaming must maintain an in-memory _state map_ for each window within each partition. For fault tolerance reasons, the state map has to be saved after a partition is processed, and it needs to be saved somewhere fault-tolerant. To meet those requirements, the Streaming API saves the maps to a distributed store. On some clusters, that will be HDFS. Databricks uses the DBFS.

That means that every time it finishes processing a window, the Streaming API writes its internal map to disk. The write has some overhead, typically between 1 and 2 seconds.

One way to reduce this overhead is to reduce the number of partitions Spark shuffles to.

In most cases, you want a 1-to-1 mapping of partitions to cores for streaming applications.

Rerun the query below and notice the performance improvement.

Once the data is loaded, render a line graph with 
* **Keys** is set to `start`
* **Series groupings** is set to `action`
* **Values** is set to `count`

In [29]:
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

display(countsDF)

start,count,action
2016-07-26T02:00:00.000+0000,11,Close
2016-07-26T02:00:00.000+0000,179,Open
2016-07-26T03:00:00.000+0000,1001,Open
2016-07-26T03:00:00.000+0000,344,Close
2016-07-26T04:00:00.000+0000,999,Open
2016-07-26T04:00:00.000+0000,815,Close
2016-07-26T05:00:00.000+0000,1000,Open
2016-07-26T05:00:00.000+0000,1003,Close
2016-07-26T06:00:00.000+0000,993,Open
2016-07-26T06:00:00.000+0000,1011,Close


Wait until stream is done initializing...

In [31]:
#TODO
untilStreamIsReady("display_query_4")

When you are done, stop all the streaming jobs.

In [33]:
for s in spark.streams.active: # Iterate over all active streams
  s.stop()                    # Stop the stream

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Problem with Generating Many Windows</h2>

We are generating a window for every 1 hour aggregate. 

_Every window_ has to be separately persisted and maintained.

Over time, this aggregated data will build up in the driver.

The end result being a massive slowdown if not an OOM Error.

### How do we fix that problem?

-sandbox

<script type="text/javascript">
  window.onload = function() {
    var allHints = document.getElementsByClassName("hint-5927");
    var answer = document.getElementById("answer-5927");
    var totalHints = allHints.length;
    var nextHint = 0;
    var hasAnswer = (answer != null);
    var items = new Array();
    var answerLabel = "Click here for the answer";
    for (var i = 0; i < totalHints; i++) {
      var elem = allHints[i];
      var label = "";
      if ((i + 1) == totalHints)
        label = answerLabel;
      else
        label = "Click here for the next hint";
      items.push({label: label, elem: elem});
    }
    if (hasAnswer) {
      items.push({label: '', elem: answer});
    }

    var button = document.getElementById("hint-button-5927");
    if (totalHints == 0) {
      button.innerHTML = answerLabel;
    }
    button.onclick = function() {
      items[nextHint].elem.style.display = 'block';
      if ((nextHint + 1) >= items.length)
        button.style.display = 'none';
      else
        button.innerHTML = items[nextHint].label;
        nextHint += 1;
    };
    button.ondblclick = function(e) {
      e.stopPropagation();
    }
    var answerCodeBlocks = document.getElementsByTagName("code");
    for (var i = 0; i < answerCodeBlocks.length; i++) {
      var elem = answerCodeBlocks[i];
      var parent = elem.parentNode;
      if (parent.name != "pre") {
        var newNode = document.createElement("pre");
        newNode.append(elem.cloneNode(true));
        elem.replaceWith(newNode);
        elem = newNode;
      }
      elem.ondblclick = function(e) {
        e.stopPropagation();
      };

      elem.style.marginTop = "1em";
    }
  };
</script>

<div>
  <button type="button" class="btn btn-light"
          style="margin-top: 1em"
          id="hint-button-5927">Click here for a hint</button>
</div>
<div id="answer-5927" style="padding-bottom: 20px; display: none">
  The answer:
  <div class="answer" style="margin-left: 1em">
One simple solution is to increase the size of our window (say, to 2 hours).

That way, we&#x27;re generating fewer windows.

But if the job runs for a long time, we&#x27;re still building up an unbounded set of windows.

Eventually, we could hit resource limits.
  </div>
</div>

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Watermarking</h2>

A better solution to the problem is to define a cut-off.

A point after which Structured Streaming is allowed to throw saved windows away.

That's what _watermarking_ allows us to do.

### Refining our previous example

Below is our previous example with watermarking. 

We're telling Structured Streaming to keep no more than 2 hours of aggregated data.

In [38]:
watermarkedDF = (inputDF
                 .withWatermark("time","2 hours")
# Specify a 2-hour watermark
  .groupBy(col("action"),                     # Aggregate by action...
   window(col("time"), "1 hour"))     # ...then by a 1 hour window
  .count()                                    # For each aggregate, produce a count
  .select(col("window.start").alias("start"), # Elevate field to column
          col("count"),                       # Include count
          col("action"))                      # Include action
  .orderBy(col("start"))                      # Sort by the start time
)
display(watermarkedDF)                        # Start the stream and display it

start,count,action
2016-07-26T02:00:00.000+0000,11,Close
2016-07-26T02:00:00.000+0000,179,Open
2016-07-26T03:00:00.000+0000,344,Close
2016-07-26T03:00:00.000+0000,1001,Open
2016-07-26T04:00:00.000+0000,999,Open
2016-07-26T04:00:00.000+0000,815,Close
2016-07-26T05:00:00.000+0000,1000,Open
2016-07-26T05:00:00.000+0000,1003,Close
2016-07-26T06:00:00.000+0000,993,Open
2016-07-26T06:00:00.000+0000,1011,Close


In the example above,   
* Data received 2 hour _past_ the watermark will be dropped. 
* Data received within 2 hours of the watermark will never be dropped.

More specifically, any data less than 2 hours behind the latest data processed till then is guaranteed to be aggregated.

However, the guarantee is strict only in one direction. 

Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated. 

The more delayed the data is, the less likely the engine is going to process it.

Wait until stream is done initializing...

In [41]:
#TODO
untilStreamIsReady("display_query_5")

Stop all the streams

In [43]:
for s in spark.streams.active: # Iterate over all active streams
  s.stop()                     # Stop the stream

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Next Steps</h2>

Start the next lab, [Time Windows Lab]($./Labs/SS 03 - Time Windows Lab).

-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>