### Create a Spark session

In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Creating a static dataframe

In [2]:
static = spark.read.json("./../../../data/activity-data/")
dataSchema = static.schema

In [3]:
static.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [4]:
static.show(5)

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|   0.0078125|-0.017654419| 0.010025024|
|1424686735691|1424688581745026978|nexus4_2|  145|nexus4|   g|stand|-3.814697E-4|   0.0184021|-0.013656616|
|1424686735890|1424688581945252808|nexus4_2|  185|nexus4|   g|stand|-3.814697E-4|-0.031799316| -0.00831604|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
only showing top 5 rows



### Creating a streaming version of the same dataframe

In [5]:
# Note below that we provide dataSchema to streaming dataframe
# As it usually does not infer schema by default
# It is possible to enable this but it is not recommended.

streaming = spark.readStream\
  .schema(dataSchema)\
  .option("maxFilesPerTrigger", 1)\
  .json("./../../../data/activity-data")

In [6]:
# Note that streaming dataframe creation and execution is also lazy evaluation. 
# We do not get any results until we call a trigger. For this we define a query.

In [6]:
activityCounts = streaming.groupBy("gt").count()

In [8]:
# We also redefine partitions given we are working in local mode.

In [7]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [10]:
# We also define queryname, format & output mode

In [8]:
activityQuery = activityCounts\
  .writeStream\
  .queryName("activity_counts")\
  .format("memory")\
  .outputMode("complete")\
  .start()\


In [None]:
# To check the currently active stream

In [19]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7f793838c150>]

### Running a sample query on streaming data

In [10]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(10)

+----------+------+
|        gt| count|
+----------+------+
|       sit|640030|
|     stand|592006|
|stairsdown|486880|
|      walk|689284|
|  stairsup|543798|
|      null|543227|
|      bike|561431|
+----------+------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|

### Selection and filtering on streams

In [10]:
# How does this relate to normal / static dataframes 

from pyspark.sql.functions import expr

simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()

### Aggregations on streams

In [None]:
# Link for cube type aggregates: https://www.google.com/search?sca_esv=23c20ded85caa685&q=cube+aggregation&uds=ADvngMiIMiMH9LyyITANaU-tP7TxCmsoapubbU4OF6YqN-dZR4m4_WWghjU5yz_1J6-ynKU_M7kL14jWgtIqPtYBvL-zarvc5Enp6a1FEYBykz_OmCmfpiWy5Se2LLLBpw3ODL9hcVnl2HZYIZu2W6IA6vJ_yW2mg69oNiTPhk86Xhj2lIafZuBb-Uo6gSED5R6h8t33BVlYR3sdFM_QCDeACTgyqMzu9V9BqjkoVxty-I_tMphi-9XQ8sPJhh2XpYMRp_p63UshkFzc4gJwYoA60iYkvgW4cafb3ILDYc4w7j79XlXprMY&udm=2&prmd=ivnbz&sa=X&ved=2ahUKEwjN9J75qcWGAxWPg_0HHQReAtgQtKgLegQIDBAB&biw=1536&bih=839&dpr=2#vhid=nIWtuhkG5fLMyM&vssid=mosaic

In [11]:
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

### Joins on streams

In [None]:
historicalAgg = static.groupBy("gt", "model").avg()

In [None]:
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
  .cube("gt", "model").avg()\
  .join(historicalAgg, ["gt", "model"])\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

In [None]:
df1 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# Subscribe to multiple topics
df2 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1,topic2")\
  .load()
# Subscribe to a pattern
df3 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribePattern", "topic.*")\
  .load()

In [None]:
df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .start()

df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .option("topic", "topic1")\
  .start()

Reading data from sockets: nc -lk 9999
Link: https://medium.com/@jianglancao/finally-data-cube-aggregation-can-work-directly-in-google-bigquery-50976305b9ce

In [12]:
socketDF = spark.readStream.format("socket")\
  .option("host", "localhost").option("port", 9999).load()

In [13]:
activityCounts.writeStream.trigger(processingTime='5 seconds')\
  .format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7ff21025d4d0>

In [17]:
activityCounts.writeStream.trigger(once=True)\
  .format("console").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7ff260a6e950>