### Structured Streaming (Chapter 21)

Files from the book's 'activity' data set are in the course repository, I moved all of them to the folder **activity-staging** -- from there we will move a file at a time to a different directory **activity-live** to simulate streaming.

#### High-Level Overview

**Batch Spark**
  * Input source -- source of *records* -- usually file or files, but important concept is it's a *batch* -- process all at once, exactly once
  * Data frame on top of the input source -- location of input plus schema
  * Transformations on the data frame -- for example, document text to (term, doc, tfidf) records
  * Output Sink -- files, database, DynamoDB, queue -- important concept is the whole transformed data frame is written all at once
  
**Streaming Spark**
  * Input source -- source of *records* -- maybe a file, maybe a queue.  Records arrive asynchronously.
  * Data frame on top of the input source -- location of input plus schema.  Exactly the same
  * Transformations on the data frame -- for example, document text to (term, doc, tfidf) records
  * Output Sink -- file, queue, (console, in-memory table)
  
**New Concepts**
  * Incremental update of the data frame (!!)
  * Trigger -- when should an incremental update happen?
  * Output Mode -- how to update the derived data stream incrementally.  (Append, update changed records, rewrite the whole table)

In [0]:
%fs rm -r /FileStore/tables/activity-live

In [0]:
%fs mkdirs /FileStore/tables/activity-live

In [0]:
%fs ls /FileStore/tables/activity-staging

In [0]:
%fs head /FileStore/tables/activity-staging/0.json

In [0]:
##  First step is set up a typical 'static' data frame for demonstration purposes
##  Notice how Spark takes a directory name to mean "all files in the directory"
static = spark.read.json("/FileStore/tables/activity-staging/")
static.count()

In [0]:
# Notice that Spark inferred a schema from the header line (column names) and data values (data types)
static.schema

In [0]:
static.show(2)

In [0]:
static.groupBy('gt').count().show(100)

In [0]:
# The book saves that schema and uses it as the schema for the streaming version of this
# data frame.  We need the explicit schema because we are going to create the data frame before
# there are any data records for inferring the schema. 
# However, it is better practice to declare the schema explicitly

from pyspark.sql.types import StructField, StructType, StringType, DoubleType, LongType
activitySchema = StructType( [StructField("Arrival_Time",LongType(),True),
                              StructField("Creation_Time",LongType(),True),
                              StructField("Device",StringType(),True),
                              StructField("Index",LongType(),True),
                              StructField("Model",StringType(),True),
                              StructField("User",StringType(),True),
                              StructField("gt",StringType(),True),
                              StructField("x",DoubleType(),True),
                              StructField("y",DoubleType(),True),
                              StructField("z",DoubleType(),True)] )

In [0]:
# Just to reinforce -- read from the same file, but it give it our manually declared schema
static = spark.read.format('json').options(header='false', schema=activitySchema).load("/FileStore/tables/activity-staging/")

In [0]:
static.schema

In [0]:
static.show(5)

In [0]:
static.groupBy('gt').count().show()

In [0]:
# Dummy data?  Good to know.
static.groupBy('User').count().show()

In [0]:
# Text says nothing about Index.   But it has 330K distinct values, so we won't be 
# grouping or selecting on it anyway!  
static.select('Index').distinct().count()

In [0]:
#  Create exactly the same data set, except a streaming version.  Notice that it reads from activity-live,
#  which is empty at the moment.
#  Notice that this is essentially the same as creating the static data frame except the 'maxFilesPerTrigger'

streaming = spark.readStream\
  .schema(activitySchema)\
  .option("maxFilesPerTrigger",1)\
  .json("/FileStore/tables/activity-live")

In [0]:
# Notice this sets up a data frame based on a streaming data frame.
#   but how do we then extract information from it?

activityCounts = streaming.groupBy("gt").count()

In [0]:
activityCounts.show()

In [0]:
# Advice from the book since we're running on a single node.
# Lots of partitions will be harmful if there's only a few workers!
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [0]:
# Here is our fundamental way of getting information to our streaming "consumers"
#    We already saw the queryName method -- lets us do SQL operations on a query name 
#     that corresponds to the DataFrame
#        .start begins a process of monitoring the streaming data frame for changes
#        .format says that the 'consumer' of the stream is an in-memory table
#        .outputMode means rewrite the whole table every time its contents changes

activityQuery = activityCounts\
  .writeStream\
  .queryName("activity_counts")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
#  This is indicated by the book as necessary, when the job is run in a batch context.
#  However, it causes the command above to hang in a notebook.    We need to be 
#  careful to do activityQuery.stop() instead
# activityQuery.awaitTermination()

activityQuery.stop()

In [0]:
activityQuery = activityCounts\
  .writeStream\
  .queryName("activity_counts")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.streams.active

In [0]:
# This should be empty since the underlying directory is empty.
spark.sql("SELECT * FROM activity_counts").show()

In [0]:
%fs ls /FileStore/tables/activity-staging

In [0]:
def cpnum(num):
  dbutils.fs.cp(f"FileStore/tables/activity-staging/{num}.json", 
                f"FileStore/tables/activity-live/{num}.json")

In [0]:
cpnum(0)

In [0]:
spark.sql("SELECT * FROM activity_counts").show()

In [0]:
cpnum(1)

In [0]:
spark.sql("SELECT * FROM activity_counts").show()

Create a different query on the same streaming data frame, this
with some more complexity (like adding a new column) and selecting rows.
Notice the output mode of append.  It will just add new records to the end of the
query.   This makes sense since as you add more records to the stream, the output
stream will just increase in rows

In [0]:
from pyspark.sql.functions import expr

simpleTransform = streaming\
  .withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()

In [0]:
spark.streams.active

In [0]:
spark.sql("SELECT * FROM simple_transform").show(5)

In [0]:
spark.sql("SELECT * FROM simple_transform").count()

In [0]:
cpnum(2)

In [0]:
spark.sql("SELECT * FROM simple_transform").show(5)

In [0]:
spark.sql("SELECT * FROM simple_transform").count()

Third example, aggregations
```
The cube function “takes a list of columns and applies aggregate expressions to all possible combinations of the grouping columns”.
```

In [0]:
static.cube("gt", "model").avg().show()

In [0]:
deviceModelStats = streaming.cube("gt", "model")\
  .avg()\
  .drop("avg(Arrival_Time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream\
  .queryName("device_model_stats")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
spark.sql("SELECT * FROM device_model_stats").show()

Demonstrates joining a streaming dataframe (deviceModelStats) with a static stream.
Here the static stream is the historical average

In [0]:
# Static historical average for x, y, z taken from all files
historicalAgg = static.drop('Arrival_Time')\
  .drop('Creation_Time')\
  .drop('Index')\
  .drop('Device')\
  .drop('User')\
  .cube(['Model', 'gt'])\
  .avg()\
  .withColumnRenamed('Model', 'model')\
  .withColumnRenamed('avg(x)', 'historicalx')\
  .withColumnRenamed('avg(y)', 'historicaly')\
  .withColumnRenamed('avg(z)', 'historicalz')
historicalAgg.show(5)

In [0]:
deviceModelCleaned = streaming.cube("gt", "model")\
  .avg()\
  .drop("User")\
  .drop("avg(Arrival_Time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\

deviceModelJoined = deviceModelCleaned\
  .join(historicalAgg, ["gt", "model"])\
  .writeStream\
  .queryName("device_model_joined")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [0]:
deviceModelJoined.stop()

In [0]:
spark.sql("SELECT * FROM device_model_joined").show()

In [0]:
cpnum(3)

In [0]:
spark.sql("SELECT * FROM device_model_joined").show()

In [0]:
for stream in [deviceModelStats, deviceModelJoined, simpleTransform, activityQuery]:
  stream.stop()