In [None]:
import findspark
findspark.init()
import pyspark


from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/tmp/hive").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [23]:
from time import sleep
from pyspark.sql.functions import expr, window, col

In [16]:
static=spark.read.json("../../data/SparkTheDefinitiveGuide/activity-data/")
schema=static.schema
static.printSchema()
static.show(2)

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

+-------------+-------------------+--------+-----+------+----+-----+-------------+-----------+-------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|            x|          y|            z|
+-------------+-------------------+--------+-----+------+----+-----+-------------+-----------+-------------+
|1424686735175|1424686733176178965|nexus4_1|   35|nexus4|   g|stand| 0.0014038086|  5.0354E-4|-0.0124053955|
|1424686735378|1424686733382813486|nexus4_1|   76|nexus4|   g|stand|-0.0039367676|0.026138306|  -0.01133728|
+-------------+-------------------+--------+-----+------+----+-----+-------------+---

In [None]:
streaming = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).json("data/activity-data/")

activityCounts = streaming.groupBy("gt").count()

activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()

#activityQuery.awaitTermination()

In [9]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x1da6ccae6c8>,
 <pyspark.sql.streaming.StreamingQuery at 0x1da6ccae848>]

In [None]:
for x in range(5):
 spark.sql("SELECT * FROM activity_counts").show()
 sleep(1)

SimpleTransformation ( append)

In [8]:
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
.where("stairs")\
.where("gt is not null")\
.select("gt", "model", "arrival_time", "creation_time")\
.writeStream\
.queryName("simple_transform")\
.format("memory")\
.outputMode("append")\
.start()

In [11]:
for x in range(5):
 spark.sql("SELECT * FROM simple_transform").show(2)
 sleep(1)

+--------+------+-------------+-------------------+
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983719|1424687981726802718|
|stairsup|nexus4|1424687984000|1424687982009853255|
+--------+------+-------------+-------------------+
only showing top 2 rows

+--------+------+-------------+-------------------+
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983719|1424687981726802718|
|stairsup|nexus4|1424687984000|1424687982009853255|
+--------+------+-------------+-------------------+
only showing top 2 rows

+--------+------+-------------+-------------------+
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983719|1424687981726802718|
|stairsup|nexus4|1424687984000|1424687982009853255|
+--------+------+-------------+-------------------+
only showing t

Aggregrations (complete)

In [12]:
deviceModelStats = streaming.cube("gt", "model").avg()\
.drop("avg(Arrival_time)")\
.drop("avg(Creation_Time)")\
.drop("avg(Index)")\
.writeStream.queryName("device_counts").format("memory")\
.outputMode("complete")\
.start()

In [13]:
for x in range(5):
 spark.sql("SELECT * FROM device_counts").show(2)
 sleep(1)

+-----+-----+--------------------+--------------------+--------------------+
|   gt|model|              avg(x)|              avg(y)|              avg(z)|
+-----+-----+--------------------+--------------------+--------------------+
|  sit| null|-4.84874225480320...|3.237740430277435E-4|-4.65213713798286...|
|stand| null|-3.19865608911240...|4.070066943871052E-4|1.027885098598976...|
+-----+-----+--------------------+--------------------+--------------------+
only showing top 2 rows

+-----+-----+--------------------+--------------------+--------------------+
|   gt|model|              avg(x)|              avg(y)|              avg(z)|
+-----+-----+--------------------+--------------------+--------------------+
|  sit| null|-4.84874225480320...|3.237740430277435E-4|-4.65213713798286...|
|stand| null|-3.19865608911240...|4.070066943871052E-4|1.027885098598976...|
+-----+-----+--------------------+--------------------+--------------------+
only showing top 2 rows

+-----+-----+-------------

Joins ( static with streaming)

In [18]:
historicalAgg = static.groupBy("gt", "model").avg()
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
.cube("gt", "model").avg()\
.join(historicalAgg, ["gt", "model"])\
.writeStream.queryName("device_counts_joined_with_static").format("memory")\
.outputMode("complete")\
.start()

In [19]:
for x in range(5):
 spark.sql("SELECT * FROM device_counts_joined_with_static").show(2)
 sleep(1)

+----+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|  gt| model|              avg(x)|              avg(y)|              avg(z)|   avg(Arrival_Time)|  avg(Creation_Time)|       avg(Index)|              avg(x)|              avg(y)|              avg(z)|
+----+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|bike|nexus4| 0.02096106490343168|-0.00915222693840...|-0.08538726699652668|1.424751134197978...|1.424752122472926...|326483.0788681517|0.020961064903431633|-0.00915222693840...|-0.08538726699652678|
|null|nexus4|-0.00601179783148...|-7.22352747272201...|0.003847526633724176|1.424749016665859...|1.424749940430317...| 219268.945300536|-0.00601179783148...|-7.22352747272203...|0.003847526633724...|


#### Trigger 
( when to write, the ones before was as soon as the current processing is done)

In [20]:
#Processing time trigger
activityCounts.writeStream.trigger(processingTime='5 seconds').format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x1da6cd25748>

In [None]:
#Once trigger
activityCounts.writeStream.trigger(once=True).format("console").outputMode("complete").start()

#### Windows on Event Time ( Sstateful streaming)

In [24]:
streaming = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).json("data/activity-data/")
withEventTime = streaming.selectExpr("*","cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

###### Tumbling Windows
operate on the data received since the last trigger

In [29]:
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()\
.writeStream\
.queryName("events_per_window_tumbling")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x1da6ccaeec8>

In [32]:
spark.sql("SELECT * FROM events_per_window_tumbling").show(2)
spark.sql("SELECT * FROM events_per_window_tumbling").printSchema()  # we get a struct in return, not a list

+--------------------+-----+
|              window|count|
+--------------------+-----+
|[2015-02-24 17:20...| 3762|
|[2015-02-24 18:30...| 3383|
+--------------------+-----+
only showing top 2 rows

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [33]:
# most of the normal operations are allowed
withEventTime.groupBy(window(col("event_time"), "10 minutes"), "User").count()\
.writeStream\
.queryName("events_per_window_tumbling_agg")\
.format("memory")\
.outputMode("complete")\
.start()

+------+----+-----+
|window|User|count|
+------+----+-----+
+------+----+-----+



In [34]:
spark.sql("SELECT * FROM events_per_window_tumbling_agg").show(2)

+--------------------+----+-----+
|              window|User|count|
+--------------------+----+-----+
|[2015-02-24 17:50...|   f| 3341|
|[2015-02-24 18:30...|   f|  850|
+--------------------+----+-----+
only showing top 2 rows



###### Sliding windows
Check what happened in last 10 mins, keep checking every 5 mins.

In [36]:
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
.count()\
.writeStream\
.queryName("events_per_window_sliding")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x1da6b3d4948>

In [37]:
spark.sql("SELECT * FROM events_per_window_tumbling_agg").show(2)

+--------------------+----+-----+
|              window|User|count|
+--------------------+----+-----+
|[2015-02-24 17:50...|   f| 3341|
|[2015-02-24 18:30...|   f|  850|
|[2015-02-24 20:20...|   e| 3214|
|[2015-02-23 20:00...|   h| 2337|
|[2015-02-24 19:40...|   e| 1699|
|[2015-02-24 18:30...|   d| 2533|
|[2015-02-24 19:50...|   b| 2559|
|[2015-02-23 18:00...|   c| 2545|
|[2015-02-23 15:50...|   g| 2458|
|[2015-02-24 19:00...|   b| 2052|
|[2015-02-24 20:30...|   e| 2499|
|[2015-02-23 19:10...|   a| 2819|
|[2015-02-22 06:10...|   a|    1|
|[2015-02-24 19:50...|   e| 2815|
|[2015-02-23 19:20...|   h| 2349|
|[2015-02-23 16:40...|   g| 2263|
|[2015-02-24 18:40...|   d| 2646|
|[2015-02-24 17:30...|   i| 2336|
|[2015-02-23 18:30...|   a| 1619|
|[2015-02-24 18:50...|   d| 2727|
+--------------------+----+-----+
only showing top 20 rows



###### Watermark
The above two are event time processing, so new data can appear with an old timestamp. Watermark is the timeframe marking, anything older than this is discarded.

In [38]:
withEventTime\
.withWatermark("event_time", "30 minutes")\
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
.count()\
.writeStream\
.queryName("events_per_window_with_watermark")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x1da6ccc6388>

Now, Structured Streaming will wait until 30 minutes after the final timestamp of this 10-minute rolling window before it finalizes the result of that window. We can query our table and see the intermediate results because we’re using complete mode—they’ll be updated over time. In append mode, this information won’t be output until the window closes

In [39]:
spark.sql("SELECT * FROM events_per_window_with_watermark").show(2)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|[2015-02-23 19:45...| 2736|
|[2015-02-24 17:20...| 3762|
+--------------------+-----+
only showing top 2 rows

