Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". You can run all the tests with the validate button. If the validate command takes too long, you can also confirm that you pass all the tests if you can run through the whole notebook without getting validation errors.

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## Structured Streaming exercises

In this problem set you will use structured streaming to analyze made-up trail camera data. We will simulate real-time streaming by having multiple data files and loading them one by one.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html



In [15]:
from pyspark.sql import SparkSession, Row
import pyspark.sql
import json
from pyspark.sql.types import *
import time
from IPython.display import display, clear_output
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .getOrCreate()

path = "data/"

## Load data

First we'll start with normal dataframe exercises. Create a method that loads the trail camera data into a dataframe. The data is in JSON format. You might have to specify the schema with the StructType methods. The dataframe will have null values called 'null', you can either remove them or leave them be. This dataframe simulates the input dataframe that we will use for streaming.

param `path`: path to the JSON dataset.

`return`: dataframe containing trail camera information.

schema:

Name | Type
------| :-----
time  | Timestamp (nullable = true)
animal_name | String (nullable = true)
weather | String (nullable = true)
battery | Double (nullable = true)

In [16]:
# Create DataFrame representing data in the JSON files
schema = StructType([
    StructField("time", TimestampType(), True),
    StructField("animal_name", StringType(), True),
    StructField("weather", StringType(), True),
    StructField("battery", DoubleType(), True),        
])

def loadData(path): 
    return spark.read.schema(schema).json(path).dropna()



In [17]:
#example print
loadData(path).show(40)

+-------------------+-----------+-------+-------+
|               time|animal_name|weather|battery|
+-------------------+-----------+-------+-------+
|2020-04-18 21:50:40|       Deer|  Clear|    7.0|
|2020-03-10 11:23:34|   Squirrel|  Clear|   86.0|
|2019-11-09 20:36:04|   Squirrel|  Clear|   55.0|
|2019-10-31 07:22:20|   Squirrel| Cloudy|   12.0|
|2020-05-04 10:59:35|   Squirrel|  Clear|   64.0|
|2020-01-30 14:21:35|   Squirrel|  Clear|   34.0|
|2019-10-29 19:20:05|     Rabbit|  Rainy|    6.0|
|2020-01-30 11:34:51|   Squirrel|  Rainy|   96.0|
|2020-07-17 19:32:23|       Deer| Cloudy|   17.0|
|2020-05-27 09:42:41|   Squirrel|  Rainy|    5.0|
|2020-05-28 09:00:05|       Bear|  Storm|   53.0|
|2020-08-07 11:02:38|       Deer| Cloudy|   14.0|
|2019-06-22 16:02:52|     Rabbit|  Clear|   13.0|
|2019-09-23 11:56:31|   Squirrel|  Clear|   95.0|
|2020-05-31 07:27:03|   Squirrel|  Clear|  100.0|
|2019-11-29 19:11:53|     Rabbit|  Clear|   71.0|
|2020-03-23 20:06:24|     Rabbit|  Clear|   19.0|


In [18]:
'''loadData tests'''

cols = StructType([ StructField("time", TimestampType(), True),
                    StructField("animal_name", StringType(), True),
                    StructField("weather", StringType(), True),
                    StructField("battery", DoubleType(), True)])


from datetime import datetime
testTs = datetime(2020, 1, 1)

fakeData = [(testTs, "dog", "cloudy", 100.0)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = loadData(path)

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

test = str(loadData(path).sample(False, 0.01, seed=12345).limit(1).first())
correct = "Row(time=datetime.datetime(2020, 3, 10, 11, 23, 34), animal_name='Squirrel', weather='Clear', battery=86.0)"
assert test == correct, "the row was expected to be %s but it was %s" % (correct, test)


## Animal count

Next we will simulate the output dataframe that we will use for streaming. Create a method that counts the number of appearences for each animal. The dataframe should be sorted by count in descending order. You should remove the null rows now if you didn't do it in the last method.

param `df`: trail camera dataframe created using `loadData`.

`return`: dataframe containing number of appearences per animal. The dataframe should include columns "animal_name" and "count". "count"  should be in Long format, it should happen automatically with spark functions. The dataframe must not include count for null values.

example output:

animal_name|count
-------:|-----
Dog| 1234|
Cat| 1111|
Mouse| 999|

In [19]:
from pyspark.sql import functions as F
def animalCount(df):    
    return df.groupBy(df.animal_name).count().sort(F.col("count").desc())
    

In [20]:
#example print
animalCount(loadData(path)).show()

+-----------+-----+
|animal_name|count|
+-----------+-----+
|     Rabbit|  801|
|   Squirrel|  766|
|       Deer|  238|
|       Bear|   74|
|       Wolf|   72|
+-----------+-----+



In [21]:
'''animalCount tests'''

cols = StructType([ StructField("animal_name", StringType(), True),
                    StructField("count", LongType(), False)])


fakeData = [("dog", 1)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = animalCount(loadData(path))

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

assert df.count() == 5, "the number of rows was expected to be 5 but it was %s" % df.count()

df = df.toPandas()

assert df.loc[0][1] >= df.loc[1][1], "the first item was expected to have higher count than the second"
assert df.loc[3][1] >= df.loc[4][1], "the fourth item was expected to have higher count than the last"
assert df.loc[0][0] == "Rabbit", "the first item was expected to be Rabbit but it was %s" % df.loc[0][0]
assert df.loc[4][0] == "Wolf", "the last item was expected to be Wolf but it was %s" % df.loc[4][0]

test1 = str(animalCount(loadData(path)).sample(False, 0.1, seed=1).limit(1).first())
correct1 = "Row(animal_name='Wolf', count=72)"
assert test1 == correct1, "the row was expected to be %s but it was %s" % (correct1, test1)


## inputDf

Now we will finally do the streaming. First you should specify the schema for the input dataframe. The schema is the same as in the Load Data exercise. Then you should create the input dataframe with `spark.readStream` method. Remember to include the schema and the path. You will also have to include `.option("maxFilesPerTrigger", 1)` so that we can simulate real-time streaming by loading one file at a time.

param `path`: path to the JSON dataset.

`return`: input dataframe containing trail camera information.

In [22]:
def inputDf(path):    
    return spark.readStream \
        .schema(schema) \
        .option("maxFilesPerTrigger", 1) \
        .json(path)

## outputDf

Next you should create the output dataframe, similar to the Animal Count exercise. You will have to exclude the null values and sort the dataframe by count, descending order.

param `inputDF`: input dataframe created by `inputDf()`.

`return`: filtered and sorted dataframe containing the number of appearences per animal.

In [23]:
def outputDf(inputDF):    
    streaming_count_df = (
        inputDF
            .dropna()
            .groupBy(
                inputDF.animal_name
            )
            .count()
            .sort(F.col("count").desc())
    )
    return streaming_count_df

## createQuery

Finally, you should start streaming the output dataframe with the `writeStream` method. You will have to include the options `format`="memory", `queryName`="counts" and `outputMode`="complete".

param `outputDF`: output dataframe created by `outputDf()`.

`return`: a query on the output dataframe

In [24]:
def createQuery(outputDF):    
    query = (
      outputDF
        .writeStream
        .format("memory")
        .queryName("counts")
        .outputMode("complete")
        .start()
    )
    return query

In [25]:
'''streaming tests'''
inputStreamDf = inputDf(path)
outputStreamDf= outputDf(inputStreamDf)
query = createQuery(outputStreamDf)

assert outputStreamDf.isStreaming, "the outputDF was expected to be streaming"

df = spark.sql("select * from counts")

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

status = {'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

assert query.status == status, "the status was expected to be %s but it was %s" % (status, query.status)

x = df.count()
assert df.count() == 0, "the number of rows was expected to be 0 when the streaming just started but it was %s" % x


In [26]:
# You can print streaming here by adjusting n, but set n to 0 before submitting
n = 0
for i in range(n):
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM counts').show())
    time.sleep(3)

In [13]:
spark.stop()