-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://cdn2.hubspot.net/hubfs/438089/docs/training/dblearning-banner.png" alt="Databricks Learning" width="555" height="64">
</div>

-sandbox
&copy; 2018 Databricks, Inc. All rights reserved.<br/>

### Challenges
* Larger Data
* Faster data and decisions - seconds, minutes, hours not days or weeks after it is created
* Streaming Pipelines can be hard
* Realtime Dashboards and alerts - for the holiday season, promotional campaigns, track falling or rising trends

### Azure Databricks Solutions
* Deploy Event Hubs with a click of button
* Connect Azure Databricks with a click of a button
* Easy streaming pipelines almost the same as batch - SQL, Python, Scala, Java & R
* Make this data avialable on Storage or ADL to end users in minutes not days or weeks. 

### Why Initech Needs Streaming
* Sales up or down (rolling 24 hours, 1 hour), to identify trends that are good or bad
* Holidays and promotions - how are the performing in real time

-sandbox
## What is Structured Streaming?

<div style="width: 100%">
  <div style="margin: auto; width: 800px">
    <img src="http://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png"/>
  </div>
</div>

Data is appended to the Input Table every _trigger interval_. For instance, if the trigger interval is 1 second, then new data is appended to the Input Table every seconds. (The trigger interval is analogous to the _batch interval_ in the legacy RDD-based Streaming API.)

####Azure Databricks for Streaming Analytics, Alerts, ETL & Data Engineers 

![arch](https://kpistoropen.blob.core.windows.net/collateral/roadshow/azure_roadshow_sde.png)

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) *Part-1:* Create Streaming DataFrame

In [7]:
#schema for our streaming DataFrame

from pyspark.sql.types import *
schema = StructType([ \
  StructField("orderUUID", StringType(), True), \
  StructField("productId", IntegerType(), True), \
  StructField("userId", IntegerType(), True), \
  StructField("quantity", IntegerType(), True), \
  StructField("discount", DoubleType(), True), \
  StructField("orderTimestamp", TimestampType(), True)])

In [8]:
%fs ls /mnt/training-sources/initech/streaming/orders/data/

In [9]:
#streaming DataFrame reader for data on Azure Storage

streaming_df = spark.readStream \
    .schema(schema) \
    .option("maxFilesPerTrigger", 1) \
    .csv("dbfs:/mnt/training-sources/initech/streaming/orders/data/part-*")

## What can we do with this Streaming DataFrame?

If you run the following cell, you'll get a continuously updating display of the number of records read from the stream so far. Note that we're just calling `display()` on our DataFrame, _exactly_ as if it were a DataFrame reading from a static data source.

To stop the continuous update, just cancel the query.

In [11]:
display(streaming_df)

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) *Part-2:* Transform Streaming DataFrame

### It's just a DataFrame

We can use normal DataFrame transformations on our streaming DataFrame. For example, let's group the number of orders by productId

<img src="https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png"/>

In [14]:
from pyspark.sql.functions import *
top_products = streaming_df.groupBy("productId").agg(sum(col("quantity")).alias("total_units_by_product")).orderBy(desc("total_units_by_product"))

* Call `display` on `top_products`
* Turn the streaming table into a streaming bar chart

In [16]:
display(top_products)

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) *Part-3:* Streaming Joins

### Streaming Joins

Grouping by unkown product IDs is not that that exciting. Let's join the stream with the product lookup data set
* Use the join key productId
* Hint: Since both DataFrames have the same column name `productId`
* Use the duplicated columns trick documented here: https://docs.azuredatabricks.net/spark/latest/faq/join-two-dataframes-duplicated-column.html

Load the product lookup data from Azure Storage

In [20]:
product_lookup = spark.read.parquet("/mnt/training-sources/initech/productsFull/")

Join the `streaming_df` with `product_lookup` on `productId`
* Hint: https://docs.azuredatabricks.net/spark/latest/faq/join-two-dataframes-duplicated-column.html

In [22]:
#TO-DO
joined_df = streaming_df.join(product_lookup, "ProductID")

In [23]:
display(joined_df)

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) *Part-4:* Calculate a Streaming Dashboard - Revenue by Product Name

### Calculate the Total Revenue by Product Name

* Now that we have the product `Name` let's use that instead of the `productId` to `groupBy`
* Also let's calculate the total revenue instead of just units sold
  * Use the `quanity` column and the `StandardCost` column

In [26]:
#TO-DO
top_products = joined_df.groupBy("Name").agg(sum(col("quantity")*col("StandardCost")).alias("total_revenue_by_product")).orderBy(desc("total_revenue_by_product"))

In [27]:
display(top_products)