# Spark (Structured) Streaming

This is an illustative example for the usage of spark for streaming. We are doing something similar to the assignment, i.e., frequency counting, but now making use of the highly scalable and robust streaming library.

In [None]:
from pyspark.sql import SparkSession

# use the following if you are running this notebook on your host machine and started the spark-connect service on the spark-master container
# spark = SparkSession.builder.remote('sc://localhost:15002').appName('streaming-exercise').getOrCreate()
# use this line instead if you are running the notebook on the spark-master container itself
spark = SparkSession.builder.master('spark://spark-master:7077').appName('streaming-exercise').getOrCreate()

### Stream Query Setup
Spark Structured Streaming uses "watermarking" to make exactly-once guarantees for their data handling.
This is out-of-scope for us to treat in-depth but, essentially, this sets a maximal delay after which incoming but delayed data will be counted. (this is relevant if events have an _event creation_ time that can differ from their _processing time_, i.e., due to network delays)
The purpose here is also to be able to delete old state information that is older than this delay.

After this "watermarking" step, you can see the application of stateful and stateless operators, such as _filter_, to the stream.
In the end, two different _sinks_ are defined.

As an exercise, try to draw the computation graph (like on slide 90 in the Big Data lecture).

In [None]:
import pyspark.sql.functions as F

lines = spark \
    .readStream \
    .format('socket') \
    .option('host', 'localhost') \
    .option('port', 9999) \
    .load()

# extract words
words = lines.select(
   F.explode(
       F.split(lines.value, ' ')
   ).alias('word')
)

# watermark
words_with_time = words.withColumn('timestamp', F.current_timestamp()).withWatermark('timestamp', '5 seconds')

# F.window creates sliding/moving windows
catpm = words_with_time.groupby(F.window('timestamp', '60 seconds', '5 seconds'), 'word').agg(F.count('word').alias('cats per minute')).drop('word')

word_counts = words_with_time.groupBy(F.window('timestamp', '60 seconds', '30 seconds'), 'word').count()

catpm_output = catpm.writeStream.outputMode('complete').format('memory').queryName('catpm')
word_counts_output = word_counts \
    .writeStream \
    .outputMode('complete') \
    .format('memory') \
    .queryName('wordcounts')

Before you start the queries, you have to open the socket via the ´nc´ commmand.

e.g. ´docker exec -it spark-master nc -lk 9999´

In [None]:
# chose one query, and comment out the other
#catq = catpm_output.start()
wcq = word_counts_output.start()

- `query.id()`: get the unique identifier of the running query that persists across restarts from checkpoint data
- `query.runId()`: get the unique id of this run of the query, which will be generated at every start/restart
- `query.me()`: get the name of the auto-generated or user-specified name
- `query.explain()`: print detailed explanations of the query
- `query.stop()`: stop the query
- `query.awaitTermination()`: block until query is terminated, with stop() or with error
- `query.exception()`: the exception if the query has been terminated with error
- `query.recentProgress()`: a list of the most recent progress updates for this query
- `query.lastProgress()`: the most recent progress update of this streaming query

In [None]:
[q.name for q in spark.streams.active]

### Results

In [None]:
# re-execute cell manually
spark.sql('SELECT * FROM wordcounts').show()

In [None]:
# re-execute cell manually
spark.sql('SELECT * FROM catpm').show()

### Automatic Updates

In [None]:
import plotly.express as px
import pandas as pd
from plotly.graph_objects import FigureWidget

initialization_dummy = pd.DataFrame([['a', 0], ['b', 5]], columns=['word', 'count'])
f = px.bar(initialization_dummy, x='word', y='count', title=f'The running word counts')
fw = FigureWidget(f)

def upd_fw_1(df):
    df = df.sort_values('count', ascending=False)
    w = None
    if 0 < len(df): 
        w = df['window'].iloc[0]
    fw.data[0].x = df['word'].values
    fw.data[0].y = df['count'].values
    fw.update_layout(title_text=f'The running word counts for window {w}')

fw

In [None]:
initialization_dummy = pd.DataFrame([['cat', 0]], columns=['word', 'cats per minute'])
f_cat = px.bar(initialization_dummy, y='word', x='cats per minute', range_x=[0, 200], orientation='h', title=f'Occurrences of the word cat per minute')
fw_cat = FigureWidget(f_cat)

def upd_fw_2(df):
    v = 0
    if 0 < len(df):
        v = df['cats per minute'].iloc[0]
    fw_cat.data[0].x = [v]

fw_cat

In [None]:
import asyncio
from asyncio import sleep
from IPython.core.display_functions import clear_output

def only_latest(df):
    return df[df['end'] == df['end'].max()]

update_interval = 15 # in seconds

async def update_loop(wc_updater, cat_updater):
    while True:
        clear_output()
        df = only_latest(spark.sql('SELECT window.end, * FROM wordcounts').toPandas())
        wc_updater(df)
        catdf = only_latest(spark.sql('SELECT window.end, * FROM catpm').toPandas())
        cat_updater(catdf)
        await sleep(update_interval)


In [None]:
loop = asyncio.get_event_loop()
task = loop.create_task(update_loop(upd_fw_1, upd_fw_2))

In [None]:
task

### Stopping Updates

In [None]:
task.cancel()

In [None]:
[q.stop() for q in spark.streams.active]

In [None]:
spark.stop()