In [1]:
from IPython.display import display, clear_output
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import pandas as pd

pd.options.display.max_columns = None
pd.options.display.max_rows = 30
pd.options.display.max_colwidth = 150

In [2]:
!pwd

/home/jovyan/work/Notebooks


In [11]:
spark = SparkSession.builder.appName("StructuredStreamingApplication").getOrCreate()
schema = spark.read.json("./activity-data/activity-data/").limit(100).schema

In [14]:
schema

StructType(List(StructField(Arrival_Time,LongType,true),StructField(Creation_Time,LongType,true),StructField(Device,StringType,true),StructField(Index,LongType,true),StructField(Model,StringType,true),StructField(User,StringType,true),StructField(gt,StringType,true),StructField(x,DoubleType,true),StructField(y,DoubleType,true),StructField(z,DoubleType,true)))

In [13]:
streaming = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1)\
.json("./activity-data/")

In [16]:
simpleTransform = streaming.withColumn("stairs", f.expr("gt like '%stairs%'"))\
.where("stairs")\
.where("gt is not null")\
.select("gt", "model", "arrival_time", "creation_time")\
.writeStream\
.queryName("simple_transform")\
.format("memory")\
.outputMode("append")\
.start()

In [20]:
spark.sql("SELECT * FROM simple_transform").show(5)

+--------+------+-------------+-------------------+
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983801|1424689829851420571|
|stairsup|nexus4|1424687984163|1424687982169917952|
|stairsup|nexus4|1424687984571|1424687982572835163|
|stairsup|nexus4|1424687984972|1424687982975667195|
|stairsup|nexus4|1424687985370|1424687983379305060|
+--------+------+-------------+-------------------+
only showing top 5 rows



In [17]:
deviceModelStats = streaming.cube("gt", "model").avg()\
.drop("avg(Arrival_time)")\
.drop("avg(Creation_Time)")\
.drop("avg(Index)")\
.writeStream.queryName("device_counts").format("memory")\
.outputMode("complete")\
.start()

In [19]:
spark.sql("SELECT * FROM device_counts").show(5)

+----------+------+--------------------+--------------------+--------------------+
|        gt| model|              avg(x)|              avg(y)|              avg(z)|
+----------+------+--------------------+--------------------+--------------------+
|      null|nexus4|0.001243822064214...|-0.00508318886798...|-0.01029860468302...|
|      null|  null|-0.00534777761728...|-0.00471625131602...|0.001053548924143...|
|stairsdown|  null|0.022206868836056656| -0.0326125139584748| 0.11849359875695885|
|       sit|  null|-5.48643225000002E-4|-1.75231850243742...|-2.21252465063372...|
|stairsdown|nexus4|0.022206868836056656| -0.0326125139584748| 0.11849359875695885|
+----------+------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [31]:
# spark.sql can be used to request how the query is performing

from time import sleep
for x in range(0, 5):
    display(spark.sql(f"SELECT * from device_counts").toPandas().dropna().head(3))
    sleep(1)
    
    #clear_output(wait=True)
else:
    print("Live view ended...")

Unnamed: 0,gt,model,avg(x),avg(y),avg(z)
4,stairsdown,nexus4,0.021614,-0.03249,0.120359
5,bike,nexus4,0.022689,-0.008779,-0.08251
6,,nexus4,-0.008477,-0.00073,0.003091


Unnamed: 0,gt,model,avg(x),avg(y),avg(z)
4,stairsdown,nexus4,0.021614,-0.03249,0.120359
5,bike,nexus4,0.022689,-0.008779,-0.08251
6,,nexus4,-0.008477,-0.00073,0.003091


Unnamed: 0,gt,model,avg(x),avg(y),avg(z)
4,stairsdown,nexus4,0.021614,-0.03249,0.120359
5,bike,nexus4,0.022689,-0.008779,-0.08251
6,,nexus4,-0.008477,-0.00073,0.003091


Unnamed: 0,gt,model,avg(x),avg(y),avg(z)
4,stairsdown,nexus4,0.021614,-0.03249,0.120359
5,bike,nexus4,0.022689,-0.008779,-0.08251
6,,nexus4,-0.008477,-0.00073,0.003091


Unnamed: 0,gt,model,avg(x),avg(y),avg(z)
4,stairsdown,nexus4,0.021614,-0.03249,0.120359
5,bike,nexus4,0.022689,-0.008779,-0.08251
6,,nexus4,-0.008477,-0.00073,0.003091


Live view ended...


In [22]:
static= spark.read.json("./activity-data/")

In [23]:
static.show(4)

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|   0.0078125|-0.017654419| 0.010025024|
|1424686735691|1424688581745026978|nexus4_2|  145|nexus4|   g|stand|-3.814697E-4|   0.0184021|-0.013656616|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
only showing top 4 rows



In [69]:
historicalAgg = static.groupBy("gt", "model").avg()
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
.cube("gt", "model").avg()\
.join(historicalAgg, ["gt", "model"])\
.writeStream.queryName("device_counts_join").format("memory")\
.outputMode("complete")\
.start()

In [76]:
spark.sql("SELECT * FROM device_counts_join").show(5)

+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|        gt| model|              avg(x)|              avg(y)|              avg(z)|   avg(Arrival_Time)|  avg(Creation_Time)|        avg(Index)|              avg(x)|              avg(y)|              avg(z)|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|      bike|nexus4|0.022964785314061254|-0.00883802096885...|-0.08280852157940341|1.424751134339985...|1.424752127369587...| 326459.6867328154|0.022688759550866796|-0.00877912156368...| -0.0825100166341234|
|      null|nexus4|-0.00861985194739502|-0.00118695209268...|0.002615053985997819|1.424749002876339...|1.424749919482128...| 219276.9663669269|-0.00847688860109...|-7.30455

### Triggers

In [33]:
activityCounts = streaming.groupBy("gt").count()

In [36]:
activityCounts.writeStream.trigger(processingTime='5 seconds')\
.format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x13190a710>

In [37]:
activityCounts.writeStream.trigger(once=True)\
.format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x13190afd0>

### Event Time and Stateful Processing

In [45]:
spark.conf.set("spark.sql.shuffle.partitions", 5)
static = spark.read.json("./activity-data")
streaming = spark\
.readStream\
.schema(static.schema)\
.option("maxFilesPerTrigger", 10)\
.json("./activity-data")

streaming.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [50]:
withEventTime = streaming\
.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

In [51]:
# Tumbling Windows
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()\
.writeStream\
.queryName("pyevents_per_window")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x131ea1fd0>

In [53]:
spark.sql("SELECT * FROM pyevents_per_window").printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [55]:
spark.sql("SELECT * FROM pyevents_per_window").show(5)

+--------------------+------+
|              window| count|
+--------------------+------+
|{2015-02-24 17:20...|150773|
|{2015-02-24 18:30...|133323|
|{2015-02-23 18:00...|100853|
|{2015-02-23 15:50...| 99178|
|{2015-02-24 18:00...|125679|
+--------------------+------+
only showing top 5 rows



In [57]:
# Aggregations across multiple columns
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes"), "User").count()\
.writeStream\
.queryName("events_per_window")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x1319ead50>

In [60]:
spark.sql("SELECT * FROM events_per_window").show(5)

+--------------------+----+------+
|              window|User| count|
+--------------------+----+------+
|{2015-02-24 17:50...|   f|133623|
|{2015-02-24 18:30...|   f| 33366|
|{2015-02-24 20:20...|   e|126282|
|{2015-02-23 20:00...|   h| 94669|
|{2015-02-24 19:40...|   e| 67577|
+--------------------+----+------+
only showing top 5 rows



In [59]:
# Sliding Windows
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
.count()\
.writeStream\
.queryName("pyevents_per_sliding_window")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x1325d7cd0>

In [61]:
spark.sql("SELECT * FROM pyevents_per_sliding_window").show(5)

+--------------------+------+
|              window| count|
+--------------------+------+
|{2015-02-23 19:45...|107668|
|{2015-02-24 17:20...|150773|
|{2015-02-24 18:30...|133323|
|{2015-02-22 06:05...|    35|
|{2015-02-23 18:00...|100853|
+--------------------+------+
only showing top 5 rows



In [63]:
# Watermarks for handling delays
from pyspark.sql.functions import window, col
withEventTime\
.withWatermark("event_time", "30 minutes")\
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
.count()\
.writeStream\
.queryName("watermarking_events")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x1319c96d0>

In [64]:
spark.sql("SELECT * FROM watermarking_events").show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-23 19:45...|26984|
|{2015-02-24 17:20...|37578|
|{2015-02-24 18:30...|33292|
|{2015-02-22 06:05...|    6|
|{2015-02-23 18:00...|25170|
+--------------------+-----+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import expr
withEventTime\
.withWatermark("event_time", "5 seconds")\
.dropDuplicates(["User", "event_time"])\
.groupBy("User")\
.count()\
.writeStream\
.queryName("pydeduplicated")\
.format("memory")\
.outputMode("complete")\
.start()

In [67]:
spark.sql("SELECT * FROM pydeduplicated").show(5)

+----+-----+
|User|count|
+----+-----+
|   a|80854|
|   b|91239|
|   c|77155|
|   g|91673|
|   h|77326|
+----+-----+
only showing top 5 rows



### Checkpointing and Fault Tolerance

In [40]:
# Checkpointing
static = spark.read.json("./activity-data")
streaming = spark\
.readStream\
.schema(static.schema)\
.option("maxFilesPerTrigger", 10)\
.json("./activity-data")\
.groupBy("gt")\
.count()

query = streaming\
.writeStream\
.outputMode("complete")\
.option("checkpointLocation", "./checkpointing/")\
.queryName("test_python_stream")\
.format("memory")\
.start()

In [44]:
query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [43]:
query.recentProgress[1]

{'id': '6556e144-3df9-4473-9e0a-26c18e5da84b',
 'runId': '5dfc0d17-2eaf-4011-8c9e-33b804ff5e99',
 'name': 'test_python_stream',
 'timestamp': '2021-05-31T02:25:17.391Z',
 'batchId': 8,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 12, 'triggerExecution': 12},
 'stateOperators': [{'numRowsTotal': 7,
   'numRowsUpdated': 0,
   'memoryUsedBytes': 82024,
   'numRowsDroppedByWatermark': 0,
   'customMetrics': {'loadedMapCacheHitCount': 2800,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 18896}}],
 'sources': [{'description': 'FileStreamSource[file:/Users/ankitbit/Documents/Lectures/Real-Time-Data-Analysis/Tutorials/activity-data]',
   'startOffset': {'logOffset': 7},
   'endOffset': {'logOffset': 7},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'MemorySink', 'numOutputRows': 0}}