### Structured Streaming (Based on Example from "Spark the Definitive Guide" Chapter 21)

Files from the book's 'activity' data set are in the course repository, I moved all of them to the S3 folder **ActivityFiles** -- from there we will move a file at a time to a different folder **ActivityFilesLive** to simulate streaming.

#### High-Level Overview

**Batch Spark**
  * Input source -- source of *records* -- usually file or files, but important concept is it's a *batch* -- process all at once, exactly once
  * Data frame on top of the input source -- location of input plus schema
  * Transformations on the data frame -- for example, document text to (term, doc, tfidf) records
  * Output Sink -- files, database, DynamoDB, queue -- important concept is the whole transformed data frame is written all at once
  
**Streaming Spark**
  * Input source -- source of *records* -- maybe a file, maybe a queue.  Records arrive asynchronously.
  * Data frame on top of the input source -- location of input plus schema.  Exactly the same
  * Transformations on the data frame -- for example, document text to (term, doc, tfidf) records
  * Output Sink -- file, queue, (console, in-memory table)
  
**New Concepts**
  * Incremental update of the data frame (!!)
  * Trigger -- when should an incremental update happen?
  * Output Mode -- how to update the derived data stream incrementally.  (Append, update changed records, rewrite the whole table)

### Important setup note for running on AWS

Later in this lab we will get a permissions violation when the Spark processes try to write to S3.

To resolve the problem
* When you set up the cluster, attach a key pair to it.
* Once the cluster master node is running, SSH to that node
* Enter this command <pre>sudo usermod -a -G hdfsadmingroup livy</pre>

In [None]:
%%bash
aws s3 ls s3://5330spark/ActivityFiles/

In [None]:
%%bash 
s3api put-object --bucket 5330spark --key ActivityFilesLive/

In [None]:
stagingDirectory = "s3://5330spark/ActivityFiles/"
liveDirectory = "s3://5330spark/ActivityFilesLive/"

####  Review -- Explore the Data Set using Static DataFrames

Look at the input files first

In [None]:
##  First step is set up a typical 'static' data frame for demonstration purposes
##  Notice how Spark takes a directory name to mean "all files in the directory"
##  Also notice how JSON as a file format works.

static = spark.read.json(stagingDirectory)
static.count()

In [None]:
# Notice that Spark inferred a schema from the header line (column names) and data values (data types)
static.schema

In [None]:
static.show(2)

In [None]:
# The book saves that schema and uses it as the schema for the streaming version of this
# data frame.  We need the explicit schema because for our streaming application we are going to 
# create the data frame before  there are any data records for inferring the schema. 
# However, it is better practice to declare the schema explicitly.

# But note the issue with imposing an external schema -- if you make an error, like 
# wrong name for a field, you are in trouble!

from pyspark.sql.types import StructField, StructType, StringType, DoubleType, LongType
activitySchema = StructType( [StructField("Arrival_Time",LongType(),True),
                              StructField("Creation_Time",LongType(),True),
                              StructField("Device",StringType(),True),
                              StructField("Index",LongType(),True),
                              StructField("Model",StringType(),True),
                              StructField("User",StringType(),True),     ##  MISSPELLING!
                              StructField("gt",StringType(),True),
                              StructField("x",DoubleType(),True),
                              StructField("y",DoubleType(),True),
                              StructField("z",DoubleType(),True)] )

In [None]:
# Just to reinforce -- read from the same file, but it give it our manually declared schema
static = spark.read.format('json')\
    .options(header='false')\
    .options(inferSchema=False)\
    .schema(activitySchema)\
    .load(stagingDirectory)

In [None]:
static.schema

In [None]:
static.show(5)

No data dictionary!    Take a look at device, index, model, user, gt

In [None]:
# This just establishes a "query" against the static data set
activityCounts = static.groupBy("gt").count()

In [None]:
# Now we run the query, reading and processing the whole batch
activityCounts.show()

In [None]:
#  For later analysis we will clean up the data frame.
#   Just use the fields gt, model, z, and creation_time
#   Filter null values
#   Make the names nicer

static = spark.read.format('json')\
    .options(header='false')\
    .options(inferSchema=False)\
    .schema(activitySchema)\
    .load(stagingDirectory)\
    .select("gt", "Creation_Time", "device", "z")\
    .withColumnRenamed("gt", "activity")\
    .withColumnRenamed("Creation_Time", "creation_time")\
    .filter("gt != 'null'")\
    .filter("device != 'null'")
    

In [None]:
static.show(5)

In [None]:
static.groupBy('activity').count().show()

In [None]:
static.select('activity', 'z').groupby('activity').mean().show()

### Now to the World of Streaming!

In [None]:
#  Create exactly the same data frame, except a streaming version.  Notice that it reads from activity-live,
#  which is empty at the moment.
#  Notice that this is essentially the same as creating the static data frame except 
#    readStream instead of read
#    the 'maxFilesPerTrigger'

streaming = spark.readStream\
    .schema(activitySchema)\
    .option("maxFilesPerTrigger",1)\
    .json(liveDirectory)\
    .select("gt", "Creation_Time", "device", "z")\
    .withColumnRenamed("gt", "activity")\
    .withColumnRenamed("Creation_Time", "creation_time")\
    .filter("gt != 'null'")\
    .filter("device != 'null'")

In [None]:
# It's a DataFrame.  
type(streaming)

In [None]:
#  So let's take a look!
streaming.show(5)

In [None]:
# Notice this sets up a data frame based on a streaming data frame.
# Exact same syntax as the static version.

activityCounts = streaming.groupBy("activity").count()


In [None]:
# But how do we then extract information from it?
activityCounts.show()

In [None]:
type(activityCounts)

In [None]:
# Advice from the book since we're running on a single worker.
# Lots of partitions will be harmful if there's only a few workers!
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [None]:
# Here is our fundamental way of getting information to our streaming "consumers"
#    We already saw the queryName method -- lets us do SQL operations on a query name 
#        .writeStream references a stream that will handle incremental changes to the query
#           -- notice that writeStream is the receiver for all the subsequent calls
#        .format says that the 'consumer' of the stream (the 'sink') is an in-memory table
#        .queryName points to the in-memory location of our query results
#        .outputMode means rewrite the whole table every time its contents changes
#        .start begins a process of monitoring the streaming data frame for changes

activityQuery = activityCounts\
    .writeStream\
    .format("memory")\
    .queryName("activity_counts_memory")\
    .outputMode("complete")\
    .start()

Careful when running streams in a notebook.

The book says to do 
<pre>
activityQuery.awaitTermination()
</pre>
But that causes the query to hang when run in a notebook.

We need to be careful to do 
<pre>
anyQuery.stop()
</pre>
instead when we are finished with the stream.

In [None]:
spark.streams.active

In [None]:
# Easy way to stop all streams
def stop_all_streams():
    for s in spark.streams.active:
        s.stop()

In [None]:
# Reference the query name above
spark.sql("SELECT * FROM activity_counts_memory").show()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/0.json s3://5330spark/ActivityFilesLive/0.json


In [None]:
%%bash
aws s3 ls s3://5330spark/ActivityFilesLive/

In [None]:
spark.sql("SELECT * FROM activity_counts_memory").show()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/1.json s3://5330spark/ActivityFilesLive/1.json


In [None]:
spark.sql("SELECT * FROM activity_counts_memory").show()

Create a different query on the same streaming data frame, this
with some more complexity like selecting certain rows and removing a column, but does no aggregation.
Notice the output mode of append.  It will just add new records to the end of the
query.   This makes sense since as you add more records to the stream, the output
stream will just increase in rows

In [None]:
stop_all_streams()

In [None]:
from pyspark.sql.functions import expr

# Notice these restrictions are in addition to restrictions on 
# the 'streaming' dataframe

simpleTransform = streaming\
    .select("activity", "device")\
    .where("activity not like '%stairs%'")\
    .where("device = 'nexus4_2'")\
    .writeStream\
    .queryName("simple_transform")\
    .format("memory")\
    .outputMode("append")\
    .start()

In [None]:
spark.streams.active

Check what's in the live directory at the moment

In [None]:
spark.sql("SELECT * FROM simple_transform").count()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/0.json s3://5330spark/ActivityFilesLive/0.json


In [None]:
spark.sql("SELECT * FROM simple_transform").count()

In [None]:
spark.sql("SELECT * FROM simple_transform limit 5").show()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/1.json s3://5330spark/ActivityFilesLive/1.json

In [None]:
spark.sql("SELECT count(*) FROM simple_transform")

Third example, aggregations
```
The cube function “takes a list of columns and applies aggregate expressions to all possible combinations of the grouping columns”.
```

In [None]:
static.cube("gt", "device").avg().filter("gt != 'null' and device != 'null'").show()

In [None]:
stop_all_streams()

Files 0 and 1 should be in the live directory 

In [None]:
deviceModelStats = streaming\
    .cube("gt", "device")\
    .avg()\
    .filter("device != 'null'")\
    .writeStream\
    .queryName("device_stats")\
    .format("memory")\
    .outputMode("complete")\
    .start()

In [None]:
spark.sql("SELECT * FROM device_stats").show()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/4.json s3://5330spark/ActivityFilesLive/4.json

In [None]:
spark.sql("SELECT * FROM device_stats").show()

Demonstrates joining a streaming dataframe (deviceModelStats) with a static stream.
Here the static stream is the historical average

In [None]:
# Static historical average for x, y, z taken from all files
historicalAgg = static\
    .select('Device', 'gt', 'z')\
    .withColumnRenamed('Device', 'device')\
    .withColumnRenamed('gt', 'activity')
    .cube('device', 'activity')\
    .avg()\
    .filter("device != 'null'")\
    .filter("gt != 'null'")\
    .withColumnRenamed('avg(z)', 'historical_z')

historicalAgg.show()

In [None]:
deviceModelCleaned = streaming\
    .select("device", "activity", "z")\
    .cube("gt", "device").avg()\
    .withColumnRenamed("avg(z)", "average_z")

deviceModelJoined = deviceModelCleaned\
  .join(historicalAgg, ["activity", "device"])\
  .writeStream\
  .queryName("device_model_joined")\
  .format("memory")\
  .outputMode("complete")\
  .start()

In [None]:
# These are the usual operations for a streaming query, just
# demonstrating they don't need to be method chained

ws = deviceModelJoined.writeStream
ws.queryName("device_model_joined")
ws.format('memory')
ws.outputMode('complete')
ws.start()

In [None]:
spark.sql("select * from device_model_joined").show()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/5.json s3://5330spark/ActivityFilesLive/5.json

In [None]:
spark.sql("select * from device_model_joined").show()

In [None]:
stop_all_streams()

Empty the live folder

### Experiment with Update and Append Modes, Both in Memory and With an S3 Sink

In [None]:
##  Append in memory -- suppose we are just "cleaning"the data set, 
##  keep just gt, device, and Creation_Time -- filter out null gt and device,
##  rename the column

streaming = spark.readStream.\
  schema(activitySchema).\
  option("maxFilesPerTrigger",1).\
  json(liveDirectory)


In [None]:
simpleAppend = streaming\
    .select("gt", "device", "Creation_Time")\
    .withColumnRenamed("Creation_Time", "creation_time")\
    .withColumnRenamed("gt", "activity")\
    .filter("activity != 'null'")\
    .filter("device != 'null'")

In [None]:
# Same as before, but different output mode
simpleAppend\
    .writeStream\
    .queryName("simple_append")\
    .format('memory')\
    .outputMode('append')\
    .start()

In [None]:
# Nothing to see here :-)
# Something different is happening, new rows are being appended to the table, but we can't see it

spark.sql("select * from simple_append").show()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/0.json s3://5330spark/ActivityFilesLive/0.json

In [None]:
#  For update, number of events per gt.  This really should update as we get new events.
simpleUpdate = streaming\
    .select("activity")\
    .filter("activity != 'null'")\
    .groupBy("activity").count()


In [None]:
simpleUpdate\
    .writeStream\
    .queryName("simple_update")\
    .format('memory')\
    .outputMode('update')\
    .start()

In [None]:
spark.sql("select * from simple_update").show()

In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/7.json s3://5330spark/ActivityFilesLive/7.json

In [None]:
## Look carefully -- this is unexpected.  What is going on??
spark.sql("select * from simple_update").show()

####  Using S3 as a Stream Sink

In [None]:
stop_all_streams()

Empty the live directory again

In [None]:
from pyspark.sql.functions import window, col

In [None]:
##  Try output mode of complete -- completely replace the data table.
##  Can streaming completely replace the files it has already written?

countByGt = streaming\
    .select("gt")\
    .filter("gt != 'null'")\
    .groupBy("gt")\
    .count()

countByGt\
    .writeStream\
    .format("text")\
    .outputMode("complete")\
    .option("path", "s3://5330spark/Output/Complete/")\
    .start()

Try append with an S3 Sink -- just select columns and filter (no aggregation)
We would expect to see files added to the output folder as we add more files to the live input director

In [None]:
stop_all_streams()

In [None]:
appendGtEvents = streaming\
    .select("gt", "Device")\
    .filter("gt != 'null'")\
    .filter("Device != 'null'")

appendGtEvents.writeStream\
    .format("csv")\
    .outputMode("append")\
    .option("path", "s3://5330spark/Output/Complete/")\
    .option("checkpointLocation", "s3://5330spark/Output/Checkpoints/")\
    .start()


In [None]:
%%bash
aws s3 cp s3://5330spark/ActivityFiles/7.json s3://5330spark/ActivityFilesLive/7.json

In [None]:
%%bash
aws s3 ls s3://5330spark/Output/Complete/

What happens if we do an append output mode with a grouping query and an S3 sink?

In [None]:
appendGrouped = streaming\
    .select("gt", "z")\
    .groupBy("gt")\
    .avg()

appendGrouped\
    .writeStream.format("csv")\
    .outputMode("append")\
    .option("checkpointLocation", "s3://5330spark/Output/Checkpoints/")\
    .option("path", "s3://5330spark/Output/Complete/")\
    .start()


So what do you suppose the problem is?

### Summary -- Spark Streaming

Philosophy
    * Streaming processing code should be as close as possible to static processing code
    * But streaming process looks at new records incrementally as they "appear"
    
* Streaming Data Frame
    * Input source -- files in a directory, Kafka queue
    * Schema
    * Trigger -- file being added, message received
    
* Query
  * Based on a streaming data frame
  * Supports all data frame operation (select, project, aggregations, joins)
  
* Write Streams
  * Based on a streaming query
  * Output mode -- complete, append, or update
      * Complete, replace all records with new records
      * Append, add new records to the old records
      * Update, add only records that have changed
  * Sink -- in memory, folder, Kafka queue
  * Starts and Stops
  