In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("chapter-21-streaming")\
    .getOrCreate()

import os
SPARK_BOOK_DATA_PATH = os.environ['SPARK_BOOK_DATA_PATH']

In [2]:
from IPython.display import display

dataset - Heterogeneity Human Activity Recognition

The data consists of smartphone and smartwatch sensor readings from a variety of devices (such as accelerometer, gyroscope), sampled at the highest possible frequency supported by the devices. Readings from these sensors were recorded while users perfomed activities like biking, sitting, standing, walking, and so on

In [3]:
file_path = SPARK_BOOK_DATA_PATH + "/data/activity-data/"
static = spark.read.json(file_path)
dataSchema = static.schema

static.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [4]:
static.show(5)

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|   0.0078125|-0.017654419| 0.010025024|
|1424686735691|1424688581745026978|nexus4_2|  145|nexus4|   g|stand|-3.814697E-4|   0.0184021|-0.013656616|
|1424686735890|1424688581945252808|nexus4_2|  185|nexus4|   g|stand|-3.814697E-4|-0.031799316| -0.00831604|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
only showing top 5 rows



In [5]:
static.selectExpr("from_unixtime(Arrival_Time) as Arrival_Time").show(2,truncate=False)

+---------------------+
|Arrival_Time         |
+---------------------+
|+47116-07-11 15:18:10|
|+47116-07-11 15:21:32|
+---------------------+
only showing top 2 rows



In [6]:
# COMMAND ----------

historicalAgg = (static
    .groupBy("gt", "model").avg()
    .drop("avg(Arrival_time)")
    .drop("avg(Creation_Time)")
)

historicalAgg.show()

+----------+------+------------------+--------------------+--------------------+--------------------+
|        gt| model|        avg(Index)|              avg(x)|              avg(y)|              avg(z)|
+----------+------+------------------+--------------------+--------------------+--------------------+
|      bike|nexus4| 326459.6867328154|  0.0226887595508668|-0.00877912156368...|-0.08251001663412343|
|      walk|nexus4|149760.09974990616|-0.00390116006094...|0.001052508689953...|-6.95435553042997...|
|stairsdown|nexus4|230452.44623187225|0.021613908669165474|-0.03249018824752616| 0.12035922691504075|
|       sit|nexus4| 74577.84690275553|-5.49433244039557...|2.791446281700046E-4|-2.33994461689904...|
|     stand|nexus4|31317.877585550017|-3.11082189691711...|3.218461665975360...|2.141300040636498...|
|      null|nexus4| 219276.9663669269|-0.00847688860109...|-7.30455258739191...|0.003090601491419928|
|  stairsup|nexus4|227912.96550673083|-0.02479965287771642|-0.00800392344379...|-0

In [7]:
## Extract

streaming = spark.readStream.schema(dataSchema)\
  .option("maxFilesPerTrigger", 1)\
  .json(file_path)

In [8]:
type(static), type(streaming), streaming.isStreaming

(pyspark.sql.dataframe.DataFrame, pyspark.sql.dataframe.DataFrame, True)

In [9]:
# watch stream

from time import sleep

def show_streaming(SQL_stmt, ntimes=3, nrecords=5, sleep_sec=1):
    for x in range(ntimes):
        print(f"#{x} ...")
        # spark.sql(SQL_stmt).show(nrecords, truncate=False)
        display(spark.sql(SQL_stmt).limit(nrecords).toPandas())
        sleep(sleep_sec)

use format = `memory`, other formats are: `console, socket, kafka`

In [10]:
## Load (action)

activityQuery = (
    streaming.groupBy("gt")   # Transform
    .count()
    .writeStream
    .queryName("activity_counts")          # create a table called activity_counts
    .format("memory")
    .outputMode("complete")
    .start()                               # trigger start
)

# activityQuery.awaitTermination()

In [11]:
show_streaming(SQL_stmt="SELECT * FROM activity_counts", ntimes=10, sleep_sec=2)

#0 ...


Unnamed: 0,gt,count


#1 ...


Unnamed: 0,gt,count


#2 ...


Unnamed: 0,gt,count
0,stairsup,10451
1,sit,12309
2,stand,11385
3,walk,13256
4,bike,10797


#3 ...


Unnamed: 0,gt,count
0,stairsup,10451
1,sit,12309
2,stand,11385
3,walk,13256
4,bike,10797


#4 ...


Unnamed: 0,gt,count
0,stairsup,20904
1,sit,24619
2,stand,22770
3,walk,26512
4,bike,21594


#5 ...


Unnamed: 0,gt,count
0,stairsup,31360
1,sit,36926
2,stand,34156
3,walk,39768
4,bike,32392


#6 ...


Unnamed: 0,gt,count
0,stairsup,31360
1,sit,36926
2,stand,34156
3,walk,39768
4,bike,32392


#7 ...


Unnamed: 0,gt,count
0,stairsup,41810
1,sit,49234
2,stand,45541
3,walk,53024
4,bike,43189


#8 ...


Unnamed: 0,gt,count
0,stairsup,52262
1,sit,61542
2,stand,56926
3,walk,66280
4,bike,53987


#9 ...


Unnamed: 0,gt,count
0,stairsup,62716
1,sit,73849
2,stand,68311
3,walk,79536
4,bike,64786


In [11]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7f5a2c588400>]

In [12]:
simpleTransform = (
  streaming
    .withColumn("stairs", F.expr("gt like '%stairs%'"))
    .where("stairs")
    .where("gt is not null")
    .select("gt", "model", "arrival_time", "creation_time")
    .writeStream.queryName("simple_transform2")
    .format("memory")
    .outputMode("append")
    .start()
)

In [13]:
show_streaming(SQL_stmt="select * from simple_transform2")

#0 ...


Unnamed: 0,gt,model,arrival_time,creation_time
0,stairsup,nexus4,1424687983730,1424687981736873519
1,stairsup,nexus4,1424687984021,1424687982023708236
2,stairsup,nexus4,1424687984422,1424687982431691365
3,stairsup,nexus4,1424687984826,1424687982835622029
4,stairsup,nexus4,1424687985228,1424687983237459357


#1 ...


Unnamed: 0,gt,model,arrival_time,creation_time
0,stairsup,nexus4,1424687983730,1424687981736873519
1,stairsup,nexus4,1424687984021,1424687982023708236
2,stairsup,nexus4,1424687984422,1424687982431691365
3,stairsup,nexus4,1424687984826,1424687982835622029
4,stairsup,nexus4,1424687985228,1424687983237459357


#2 ...


Unnamed: 0,gt,model,arrival_time,creation_time
0,stairsup,nexus4,1424687983730,1424687981736873519
1,stairsup,nexus4,1424687984021,1424687982023708236
2,stairsup,nexus4,1424687984422,1424687982431691365
3,stairsup,nexus4,1424687984826,1424687982835622029
4,stairsup,nexus4,1424687985228,1424687983237459357


In [14]:
deviceModelStats = (
    streaming.cube("gt", "model").avg()
    .drop("avg(Arrival_time)")
    .drop("avg(Creation_Time)")
    .drop("avg(Index)")
    .writeStream.queryName("device_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

In [15]:
show_streaming(SQL_stmt="select * from device_counts")

#0 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z)


#1 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z)


#2 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z)


### join streaming with historical data

In [10]:
deviceModelStats = (
    streaming.drop("Arrival_Time", "Creation_Time", "Index")
  .cube("gt", "model").avg()
  .join(historicalAgg, ["gt", "model"])
  .writeStream.queryName("join_hist")
  .format("memory")
  .outputMode("complete")
  .start()
)

In [11]:
show_streaming(SQL_stmt="select * from join_hist", ntimes=5)

#0 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z),avg(Index),avg(x).1,avg(y).1,avg(z).1


#1 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z),avg(Index),avg(x).1,avg(y).1,avg(z).1


#2 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z),avg(Index),avg(x).1,avg(y).1,avg(z).1


#3 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z),avg(Index),avg(x).1,avg(y).1,avg(z).1


#4 ...


Unnamed: 0,gt,model,avg(x),avg(y),avg(z),avg(Index),avg(x).1,avg(y).1,avg(z).1


In [14]:
spark.stop()

see `chapter-21-stream-kafka.ipynb` for example using streaming with Kafka

In [None]:
# Subscribe to 1 topic
df1 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

In [None]:
# Subscribe to multiple topics
df2 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1,topic2")\
  .load()
# Subscribe to a pattern
df3 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribePattern", "topic.*")\
  .load()


# COMMAND ----------

df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .start()
df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .option("topic", "topic1")\
  .start()


# COMMAND ----------

socketDF = spark.readStream.format("socket")\
  .option("host", "localhost").option("port", 9999).load()


# COMMAND ----------

activityCounts.writeStream.trigger(processingTime='5 seconds')\
  .format("console").outputMode("complete").start()


# COMMAND ----------

activityCounts.writeStream.trigger(once=True)\
  .format("console").outputMode("complete").start()


# COMMAND ----------