Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". You can run all the tests with the validate button. If the validate command takes too long, you can also confirm that you pass all the tests if you can run through the whole notebook without getting validation errors.

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## Structured Streaming exercises

In this problem set you will use structured streaming to analyze made-up trail camera data. We will simulate real-time streaming by having multiple data files and loading them one by one.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html



In [1]:
from pyspark.sql import SparkSession, Row
import pyspark.sql
import json
from pyspark.sql.types import *
import time
from IPython.display import display, clear_output
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .getOrCreate()

path = "data/"

25/10/14 17:57:47 WARN Utils: Your hostname, codespaces-1bf18f resolves to a loopback address: 127.0.0.1; using 10.0.11.247 instead (on interface eth0)
25/10/14 17:57:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vscode/.ivy2/cache
The jars for the packages stored in: /home/vscode/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6399927f-c00f-401f-93bf-43b0922f9a98;1.0
	confs: [default]
	found graphframes#graphframes;0.8.3-spark3.5-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 187ms :: artifacts dl 11ms
	:: modules in use:
	graphframes#graphframes;0.8.3-spark3.5-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	--------------------------------

## Load data

First we'll start with normal dataframe exercises. Create a method that loads the trail camera data into a dataframe. The data is in JSON format. You might have to specify the schema with the StructType methods. The dataframe will have null values called 'null', you will have to drop rows with null values. When reading JSON you will have to use the option multiLine=true with `.option("multiLine", "true")`. This dataframe simulates the input dataframe that we will use for streaming.

param `path`: path to the JSON dataset.

`return`: dataframe containing trail camera information.

schema:

Name | Type
------| :-----
time  | Timestamp (nullable = true)
animal_name | String (nullable = true)
weather | String (nullable = true)
battery | Double (nullable = true)

In [2]:
# Create DataFrame representing data in the JSON files
def loadData(path):    
    df = spark.read.json(path)
    return df


In [3]:
#example print
loadData(path).show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/workspaces/BigDataPlatforms/StructuredStreaming/data.

In [None]:
'''loadData tests'''

cols = StructType([ StructField("time", TimestampType(), True),
                    StructField("animal_name", StringType(), True),
                    StructField("weather", StringType(), True),
                    StructField("battery", DoubleType(), True)])


from datetime import datetime
testTs = datetime(2020, 1, 1)

fakeData = [(testTs, "dog", "cloudy", 100.0)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = loadData(path)

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

test = str(loadData(path).sample(False, 0.01, seed=12345).first())
correct = "Row(time=datetime.datetime(2020, 4, 18, 21, 50, 40), animal_name='Deer', weather='Clear', battery=7.0)"
assert test == correct, "the row was expected to be %s but it was %s" % (correct, test)


## Animal count

Next we will simulate the output dataframe that we will use for streaming. Create a method that counts the number of appearences for each animal. The dataframe should be sorted by count in descending order.

param `df`: trail camera dataframe created using `loadData`.

`return`: dataframe containing number of appearences per animal. The dataframe should include columns "animal_name" and "count". "count"  should be in Long format, it should happen automatically with spark functions. The dataframe must not include count for null values.

example output:

animal_name|count
-------:|-----
Dog| 1234|
Cat| 1111|
Mouse| 999|

In [None]:
def animalCount(df):    
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
#example print
animalCount(loadData(path)).show()

In [None]:
'''animalCount tests'''

cols = StructType([ StructField("animal_name", StringType(), True),
                    StructField("count", LongType(), False)])


fakeData = [("dog", 1)]

fakeDf = spark.createDataFrame(fakeData, cols)

df = animalCount(loadData(path))

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

assert df.count() == 5, "the number of rows was expected to be 5 but it was %s" % df.count()

df = df.toPandas()

assert df.iloc[0, 1] >= df.iloc[1, 1], "the first item was expected to have higher count than the second"
assert df.iloc[3, 1] >= df.iloc[4, 1], "the fourth item was expected to have higher count than the last"
assert df.iloc[0, 0] == "Rabbit", "the first item was expected to be Rabbit but it was %s" % df.iloc[0, 0]
assert df.iloc[4, 0] == "Wolf", "the last item was expected to be Wolf but it was %s" % df.iloc[4, 0]

test1 = str(animalCount(loadData(path)).sample(False, 0.1, seed=1).first())
correct1 = "Row(animal_name='Bear', count=74)"
assert test1 == correct1, "the row was expected to be %s but it was %s" % (correct1, test1)


## inputDf

Now we will finally do the streaming. First you should specify the schema for the input dataframe. The schema is the same as in the Load Data exercise. Then you should create the input dataframe with `spark.readStream` method. Remember to include the schema and the path. You will also have to include `.option("maxFilesPerTrigger", 1)` so that we can simulate real-time streaming by loading one file at a time. You should also again use the option multiLine=true. You should remove null values either in this inputDf function or in the next outputDf function.

param `path`: path to the JSON dataset.

`return`: input dataframe containing trail camera information.

In [None]:
def inputDf(path):    
    # YOUR CODE HERE
    raise NotImplementedError()

## outputDf

Next you should create the output dataframe, similar to the Animal Count exercise. You will have to exclude the null values and sort the dataframe by count, descending order.

param `inputDF`: input dataframe created by `inputDf()`.

`return`: filtered and sorted dataframe containing the number of appearences per animal.

In [None]:
def outputDf(inputDF):    
    # YOUR CODE HERE
    raise NotImplementedError()

## createQuery

Finally, you should start streaming the output dataframe with the `writeStream` method. You will have to include the options `format`="memory", `queryName`="counts" and `outputMode`="complete".

param `outputDF`: output dataframe created by `outputDf()`.

`return`: a query on the output dataframe

Note: the test might fail if you run cells one by one manually. Try to use the "run all" button.

In [None]:
def createQuery(outputDF):    
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
'''streaming tests invoking'''

inputStreamDf = inputDf(path)
outputStreamDf= outputDf(inputStreamDf)
query = createQuery(outputStreamDf)

In [None]:
'''streaming tests'''

assert outputStreamDf.isStreaming, "the outputDF was expected to be streaming"

df = spark.sql("select * from counts")

assert df.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, df.dtypes)

x = df.count()
assert df.count() == 0, "the number of rows was expected to be 0 when the streaming just started but it was %s" % x

assert query.isActive, "The streaming query should be active."

In [None]:
# You can print streaming here by adjusting n, but set n to 0 before submitting
n = 0
for i in range(n):
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM counts').show())
    time.sleep(3)

In [None]:
query.stop()