# Spark Structured Streaming - Demo
## Fire alarm

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import io
from pyspark.sql.functions import *
import time
import json
import struct
import requests 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka-clients:2.6.0 pyspark-shell'
                                    
spark = (SparkSession.builder 
    .master("local[*]")
    .appName("test")
    .getOrCreate()
        )

spark

set up the environment variables

In [2]:
topic = 'RoboticArm'
servers = "kafka:9092"

## Answers in Spark Structured Streaming 

Please refer to [continuous-analytics-examples/blob/master/epl_robotic-arm/readme.md](https://github.com/quantiaconsulting/continuous-analytics-examples/blob/master/epl_robotic-arm/readme.md) for the EPL version of the following queries.

### Let's create the streaming Data Frames using the data in the kafka topic

In [3]:
from pyspark.sql.types import *

roboticArm_schema = StructType([
    StructField("id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("stressLevel", IntegerType(), True),
    StructField("ts", TimestampType(), True)])

raw_roboticArm_df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", topic)
  .load())

roboticArm_sdf = (raw_roboticArm_df
                      .select(from_json(col("value").cast("string"), roboticArm_schema).alias("value"))
                      .select("value.*"))

In [4]:
roboticArm_sdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- ts: timestamp (nullable = true)



### to make sure that it works, let's first inspect the content of the stream 

In [5]:
basic_query = (roboticArm_sdf
    .writeStream
    .format("memory") # this is for debug purpose only! DO NOT USE IN PRODUCTION
    .queryName("sinkTable")
    .start())

run the following cell to see the most recent content of the sinkTable

In [6]:
spark.sql("SELECT * FROM sinkTable ORDER BY TS DESC").show(5)

+---+------+-----------+---+
| id|status|stressLevel| ts|
+---+------+-----------+---+
+---+------+-----------+---+



do not forget to stop queries that you are not using

In [7]:
basic_query.stop()

# E1

> Propose how to model the streaming data generated by the robotic arms.

Let's first try with the model proposed for EPL and see what happens. To get the data run [datagen1.ipynb](datagen1.ipynb)

# E2

> Write a continuous query that emits the max stress for each arm.

### the SQL sytyle

In [8]:
# create a logic table on top of the streaming data frame
roboticArm_sdf.createTempView("RoboticArm") # this time we will not clean it up, because we use it in the next queries

In [9]:
query_string = """
SELECT id, max(stressLevel) 
FROM RoboticArm 
GROUP BY id;
"""

# write your query in SQL, register it and start it
e2 = (spark.sql(query_string)
                     .writeStream
                     .format("memory")
                     .outputMode("complete") # <-- CHANGE HERE
                     .queryName("sinkTable")
                     .start())

In [10]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show(5) # woithout ORDER BY TS DESC because the result in the table is already only the most recent

+---+----------------+
| id|max(stressLevel)|
+---+----------------+
|  1|               7|
|  2|               9|
+---+----------------+



In [11]:
# clean up
e2.stop()

### The DataFrame style

In [12]:
# write your query in SQL, register it and start it
e2bis = (roboticArm_sdf
                     .groupBy("id")
                     .max()
                     .writeStream
                     .format("memory")
                     .outputMode("complete") # 
                     .queryName("sinkTable")
                     .start())

In [13]:
# look up the most recent results
spark.sql("SELECT * FROM sinkTable").show(5) # woithout ORDER BY TS DESC because the result in the table is already only the most recent

+---+----------------+
| id|max(stressLevel)|
+---+----------------+
|  1|               7|
|  2|               9|
+---+----------------+



In [14]:
# clean up
e2bis.stop()

# E3

> A continuous query that emits the average stress level between a pick (status==goodGrasped) and a place (status==placingGood).

Spark Structured Streaming does not support the EPL's operator `->` (that reads as *followed by*. We need to use a stream-to-stream join.

Let's apply the two filters

In [15]:
moving_sdf = (roboticArm_sdf
                .where("status='movingGood'")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("ts","tsMoving")
               )

placing_sdf = (roboticArm_sdf
                .where("status='placingGood'")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

Join without the event-time constraint

In [16]:
join_sdf = (moving_sdf.join(
  placing_sdf, expr("""
    (idMoving == idPlacing) AND
    (tsPlacing > tsMoving )
    """
    )))

In [17]:
e3 = (join_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [20]:
spark.sql("SELECT * FROM sinkTable ORDER BY tsPlacing DESC").show(5,False) # note, I change ts in tsPlacing

+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
|idMoving|status    |stressLevel|tsMoving           |idPlacing|status     |stressLevel|tsPlacing          |
+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
|1       |movingGood|7          |2021-10-18 14:23:46|1        |placingGood|3          |2021-10-18 14:39:27|
|1       |movingGood|7          |2021-10-18 14:22:40|1        |placingGood|3          |2021-10-18 14:39:27|
|1       |movingGood|7          |2021-10-18 14:23:13|1        |placingGood|3          |2021-10-18 14:39:27|
|1       |movingGood|7          |2021-10-18 14:22:07|1        |placingGood|3          |2021-10-18 14:39:27|
|1       |movingGood|7          |2021-10-18 14:24:53|1        |placingGood|3          |2021-10-18 14:39:27|
+--------+----------+-----------+-------------------+---------+-----------+-----------+-------------------+
only showing top 5 rows



#### Discussion

Is this what we want?

Let's try to count how many joins we have here ...


In [21]:
spark.sql("SELECT idPlacing, tsPlacing, count(*) FROM sinkTable group by idPlacing, tsPlacing ORDER BY tsPlacing DESC").show(5,False) 

+---------+-------------------+--------+
|idPlacing|tsPlacing          |count(1)|
+---------+-------------------+--------+
|2        |2021-10-18 14:39:27|32      |
|1        |2021-10-18 14:39:27|32      |
|1        |2021-10-18 14:38:54|31      |
|2        |2021-10-18 14:38:54|31      |
|1        |2021-10-18 14:38:21|30      |
+---------+-------------------+--------+
only showing top 5 rows



**Far too many!** ... and growing :-(

O-: !!! ... and also **the state is growing** !!! :-O

In [22]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(e3.lastProgress, indent=4))
    print(e3.status)
    time.sleep(1)
    clear_output(wait=True)

{
    "id": "04a82c05-6b97-47f0-950d-b3f327e156c2",
    "runId": "6d45211d-905a-4eb8-a7f3-511a6e2c2c4f",
    "name": "sinkTable",
    "timestamp": "2021-10-18T14:40:21.646Z",
    "batchId": 91,
    "numInputRows": 8,
    "inputRowsPerSecond": 1.5488867376573088,
    "processedRowsPerSecond": 1.8289894833104712,
    "durationMs": {
        "addBatch": 4251,
        "getBatch": 0,
        "latestOffset": 1,
        "queryPlanning": 89,
        "triggerExecution": 4374,
        "walCommit": 13
    },
    "stateOperators": [
        {
            "numRowsTotal": 133,
            "numRowsUpdated": 1,
            "memoryUsedBytes": 362176,
            "customMetrics": {
                "loadedMapCacheHitCount": 36400,
                "loadedMapCacheMissCount": 0,
                "stateOnCurrentVersionSizeBytes": 70200
            }
        }
    ],
    "sources": [
        {
            "description": "KafkaV2[Subscribe[RoboticArm]]",
            "startOffset": {
                "RoboticArm"

KeyboardInterrupt: 

Monitor for a minute the field `"numRowsTotal"` in `"stateOperators"`.

**We need to add watermarks and a time constraint for state cleanup!**

In [23]:
e3.stop()

In [None]:
movingW_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='movingGood'")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("ts","tsMoving")
               )

placingW_sdf = (roboticArm_sdf
                .withWatermark("ts","1 minute") # WATERMARK ADDED HERE
                .where("status='placingGood'")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

In [48]:
joinTC_sdf = (movingW_sdf.join(
  placingW_sdf, expr("""
    (idMoving == idPlacing) AND
    (tsPlacing > tsMoving ) AND
    (tsPlacing < tsMoving + interval 14 seconds )""" # TIME CONSTRAIN ADDED HERE (considering also that the time flows at half of the speed) !!!
    )))

In [49]:
e3TC = (joinTC_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [53]:
spark.sql("SELECT idPlacing, tsPlacing, count(*) FROM sinkTable group by idPlacing, tsPlacing ORDER BY tsPlacing DESC").show(5,False) # note, I change ts in tsTemp

+---------+-------------------+--------+
|idPlacing|tsPlacing          |count(1)|
+---------+-------------------+--------+
|1        |2021-10-18 15:30:49|1       |
|2        |2021-10-18 15:30:49|1       |
|2        |2021-10-18 15:30:16|1       |
|1        |2021-10-18 15:30:16|1       |
|2        |2021-10-18 15:29:43|1       |
+---------+-------------------+--------+
only showing top 5 rows



also notice that the state no longer grows

In [52]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(e3TC.lastProgress, indent=4))
    print(e3TC.status)
    time.sleep(1)
    clear_output(wait=True)

{
    "id": "6ba4e4af-2405-4485-b920-169c89c38116",
    "runId": "0aba5bda-450d-4459-9220-d4ec79a27897",
    "name": "sinkTable",
    "timestamp": "2021-10-18T15:31:00.289Z",
    "batchId": 48,
    "numInputRows": 4,
    "inputRowsPerSecond": 0.6189076280365156,
    "processedRowsPerSecond": 0.537345513164965,
    "durationMs": {
        "addBatch": 7258,
        "getBatch": 0,
        "latestOffset": 2,
        "queryPlanning": 129,
        "triggerExecution": 7444,
        "walCommit": 32
    },
    "eventTime": {
        "watermark": "2021-10-18T15:29:39.000Z"
    },
    "stateOperators": [
        {
            "numRowsTotal": 12,
            "numRowsUpdated": 0,
            "memoryUsedBytes": 331008,
            "customMetrics": {
                "loadedMapCacheHitCount": 19200,
                "loadedMapCacheMissCount": 0,
                "stateOnCurrentVersionSizeBytes": 38400
            }
        }
    ],
    "sources": [
        {
            "description": "KafkaV2[Subscribe

KeyboardInterrupt: 

In [54]:
e3TC.stop()

**How much should the watermark and the time constraint be?**

|watermark | time constraint | number of results per arm |reason                                         |
|----------|-----------------|-------------------|-----------------------------------------------|
|  any     |           <=10  |         0         | for each arm, there is no placing within less than 10 sec to a moving |   
|  any     |    >10 & < 14   |         1 or 0    | for one of the arms, there is 1 placing within 10-14 sec to a moving        |    
|  any     |    >14 & < 44   |         1         | for each arm, there is 1 placing within 10-43 sec to a moving                 
|  any     |    >=44 & <48   |         1 or 2    | for one of the arms, there are 2 placing within 44-47 sec to a moving                |   
|  any     |          >=48   |         2+        | for each of the arm, there are more than 2 placing within 48 or more sec to a moving                     |   

The watermark does not influence the answer because, in this case, the data arrive in order and without any delay, **However it is important to clean the state**.

**Is this acceptable? Is there any other solution?**

In many cases, query answering is hard because the datamodel is over simplified.

We may go back to E1 problem and propose to change the model so to eliminate the need for a temporal join. A sequential number for the cycles of each arm would be enough to make the join deterministic. See [datagen2.ipynb](datagen1.ipynb) for the changes in the data generator.

In [55]:
roboticArmV2_schema = StructType([             ## <-- CHANGE HERE new name for the schema
    StructField("id", StringType(), True),
    StructField("cycle", IntegerType(), True), ## <-- CHANGE HERE new field 
    StructField("status", StringType(), True),
    StructField("stressLevel", IntegerType(), True),
    StructField("ts", TimestampType(), True)])

raw_roboticArmV2_df = (spark                   ## <-- CHANGE HERE new name for the df
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", "RoboticArmV2") ## <-- CHANGE HERE different topic
  .load())

roboticArmV2_sdf = (raw_roboticArmV2_df      ## <-- CHANGE HERE new name sdf
                      .select(from_json(col("value").cast("string"), roboticArmV2_schema).alias("value")) ## <-- CHANGE HERE new schema
                      .select("value.*"))

roboticArmV2_sdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- cycle: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- stressLevel: integer (nullable = true)
 |-- ts: timestamp (nullable = true)



In [56]:
movingV2_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='movingGood'")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("cycle","cycleMoving")
               )

placingV2_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='placingGood'")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("cycle","cyclePlacing")
               )

In [57]:
joinV2_sdf = (movingV2_sdf.join(
  placingV2_sdf, expr("""
    (idMoving == idPlacing) AND
    (cyclePlacing == cycleMoving )""" ## <- CHANGE HERE 
    )))

In [58]:
e3V2 = (joinV2_sdf
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [63]:
spark.sql("SELECT idPlacing, cyclePlacing, count(*) FROM sinkTable group by idPlacing, cyclePlacing ORDER BY cyclePlacing DESC").show(5,False) 

+---------+------------+--------+
|idPlacing|cyclePlacing|count(1)|
+---------+------------+--------+
|2        |4           |1       |
|1        |4           |1       |
|1        |3           |1       |
|2        |3           |1       |
|2        |2           |1       |
+---------+------------+--------+
only showing top 5 rows



In [None]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(e3V2.lastProgress, indent=4))
    print(e3V2.status)
    time.sleep(1)
    clear_output(wait=True)

**Much easier** !!

**REMEMBER**: modeling and querying are *two sides of the same coin*

We are not in a traditional RDBMS where modeling is done once for all by the DB administrator and queries must conform to "the model". We are in a setting where performance matters more than governance and chaning the model is often the only way to keep good performances.

In [68]:
e3V2.stop()

ok, let's now get the work done, because ourt goal was: 
> A continuous query that emits the average stress level between a pick (status==goodGrasped) and a place (status==placingGood).

In [None]:
goodGrasped_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='goodGrasped'")
                .withColumnRenamed("id","idGrasped")
                .withColumnRenamed("cycle","cycleGrasped")
                .withColumnRenamed("stressLevel","stressLevelGrasped")
               )

movingV3_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='movingGood'")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("cycle","cycleMoving")
                .withColumnRenamed("stressLevel","stressLevelMoving")
               )

placingV3_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='placingGood'")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("cycle","cyclePlacing")
                .withColumnRenamed("stressLevel","stressLevelPlacing")
               )

In [None]:
join1_sdf = (goodGrasped_sdf.join(
    movingV3_sdf, expr("""
    (idGrasped == idMoving) AND
    (cycleGrasped == cycleMoving)""" 
    )))

join2_sdf = (join1_sdf.join(
  placingV3_sdf, expr("""
    (idMoving == idPlacing) AND
    (cyclePlacing == cycleMoving )""" 
    )))

In [None]:
join2_sdf.printSchema()

In [None]:
avg = (join2_sdf.select(col("idPlacing").alias("ID"),
                         col("cyclePlacing").alias("cycle"),
                         expr("(stressLevelGrasped + stressLevelMoving + stressLevelPlacing)/3 AS AvgStressLevel")))

In [None]:
e3V3 = (avg
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [None]:
spark.sql("SELECT * FROM sinkTable ORDER BY cycle DESC").show(5,False) 

In [None]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(e3TC.lastProgress, indent=4))
    print(e3TC.status)
    time.sleep(1)
    clear_output(wait=True)

In [67]:
e3V3.stop()

NameError: name 'e3V3' is not defined

# E4

>A continuous query that returns the robotic arms that,
>
> * in less than 20 second (was 10 in EPL, but here the time passes at half of the speed),
> * picked a good while safely operating,
> * moved it while the controller was raising a warning, and
> * placed it while safely operating again.


In [98]:
goodGraspedSafely_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='goodGrasped' AND stressLevel < 7")
                .withColumnRenamed("id","idGrasped")
                .withColumnRenamed("cycle","cycleGrasped")
                .withColumnRenamed("stressLevel","stressLevelGrasped")
                .withColumnRenamed("ts","tsGrasped")
               )

movingWarning_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='movingGood' AND stressLevel > 6 AND stressLevel < 9")
                .withColumnRenamed("id","idMoving")
                .withColumnRenamed("cycle","cycleMoving")
                .withColumnRenamed("stressLevel","stressLevelMoving")
                .withColumnRenamed("ts","tsMoving")
               )

placingSafely_sdf = (roboticArmV2_sdf
                .withWatermark("ts","1 minute")
                .where("status='placingGood' AND stressLevel < 7")
                .withColumnRenamed("id","idPlacing")
                .withColumnRenamed("cycle","cyclePlacing")
                .withColumnRenamed("stressLevel","stressLevelPlacing")
                .withColumnRenamed("ts","tsPlacing")
               )

join1_sdf = (goodGraspedSafely_sdf.join(
    movingWarning_sdf, expr("""
    (idGrasped == idMoving) AND
    (cycleGrasped == cycleMoving)""" 
    )))

join2_sdf = (join1_sdf.join(
  placingSafely_sdf, expr("""
    (idMoving == idPlacing) AND
    (cyclePlacing == cycleMoving )""" 
    )))

within20sec = join2_sdf.where("tsPlacing <= tsGrasped + interval 20 seconds")

e4 = (within20sec
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable")
                     .start())

In [99]:
spark.sql("SELECT idGrasped AS ID, cyclePlacing AS cycle FROM sinkTable ORDER BY cyclePlacing DESC").show(5,False) 

+---+-----+
|ID |cycle|
+---+-----+
|1  |75   |
|1  |74   |
|1  |73   |
|1  |72   |
|1  |71   |
+---+-----+
only showing top 5 rows



indeed only the arm whose ID is 1

In [83]:
from IPython.display import clear_output
import json
while True:
    print(json.dumps(e3TC.lastProgress, indent=4))
    print(e3TC.status)
    time.sleep(1)
    clear_output(wait=True)

{
    "id": "6ba4e4af-2405-4485-b920-169c89c38116",
    "runId": "0aba5bda-450d-4459-9220-d4ec79a27897",
    "name": "sinkTable",
    "timestamp": "2021-10-18T15:31:07.734Z",
    "batchId": 49,
    "numInputRows": 8,
    "inputRowsPerSecond": 1.0745466756212223,
    "processedRowsPerSecond": 1.1220196353436185,
    "durationMs": {
        "addBatch": 6955,
        "getBatch": 0,
        "latestOffset": 0,
        "queryPlanning": 122,
        "triggerExecution": 7130,
        "walCommit": 26
    },
    "eventTime": {
        "watermark": "2021-10-18T15:29:39.000Z"
    },
    "stateOperators": [
        {
            "numRowsTotal": 12,
            "numRowsUpdated": 0,
            "memoryUsedBytes": 331072,
            "customMetrics": {
                "loadedMapCacheHitCount": 19600,
                "loadedMapCacheMissCount": 0,
                "stateOnCurrentVersionSizeBytes": 38400
            }
        }
    ],
    "sources": [
        {
            "description": "KafkaV2[Subscrib

KeyboardInterrupt: 

In [100]:
e4.stop()

# E5

> A continuous query that monitors the results of the previous one (i.e., E4) and counts how many times each robotic arm is present in the stream over a window of 20 seconds updating the counting every 4 seconds.

In [101]:
e5 = (within20sec
                     .withWatermark("tsPlacing", "1 minutes")
                     .groupBy(window("tsPlacing", "4 minutes", "30 seconds"),"idGrasped")
                     .count()
                     .writeStream
                     .format("memory")
                     .queryName("sinkTable") 
                     .start())

In [106]:
spark.sql("SELECT * FROM sinkTable ORDER BY window DESC").show(100,False)

+------------------------------------------+---------+-----+
|window                                    |idGrasped|count|
+------------------------------------------+---------+-----+
|[2021-10-18 16:13:00, 2021-10-18 16:17:00]|1        |7    |
|[2021-10-18 16:12:30, 2021-10-18 16:16:30]|1        |7    |
|[2021-10-18 16:12:00, 2021-10-18 16:16:00]|1        |7    |
|[2021-10-18 16:11:30, 2021-10-18 16:15:30]|1        |7    |
|[2021-10-18 16:11:00, 2021-10-18 16:15:00]|1        |7    |
|[2021-10-18 16:10:30, 2021-10-18 16:14:30]|1        |7    |
|[2021-10-18 16:10:00, 2021-10-18 16:14:00]|1        |7    |
|[2021-10-18 16:09:30, 2021-10-18 16:13:30]|1        |7    |
|[2021-10-18 16:09:00, 2021-10-18 16:13:00]|1        |8    |
|[2021-10-18 16:08:30, 2021-10-18 16:12:30]|1        |8    |
|[2021-10-18 16:08:00, 2021-10-18 16:12:00]|1        |7    |
|[2021-10-18 16:07:30, 2021-10-18 16:11:30]|1        |7    |
|[2021-10-18 16:07:00, 2021-10-18 16:11:00]|1        |7    |
|[2021-10-18 16:06:30, 2

In [107]:
e5.stop()