## Workshop Spark Structured Streaming 

## 1. Create Spark session

In [40]:
# Create Spark Session, you need this to work with Spark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("my streaming test app")  \
        .master("local[*]") \
        .config("spark.driver.memory","2g") \
        .config("spark.ui.showConsoleProgress", "false") \
        .getOrCreate()

In [41]:
spark

## 2. Config path of data

In [42]:
# sets the path to the directory with datafiles
PATH = "./data/streaming/"

schema = "timestamp int, name string, value double"

In [43]:
# Define the input part of the streaming pipeline
# This reads all the .csv files in a given directory
# It checks continuosly for arrival of new files

input_path = PATH + "*.csv"
input_stream = (spark.readStream.format("csv")
               .option("header","true")
               .schema(schema)
               .option("path", input_path)
               .load())

In [44]:
# Defines an output stream of the pipeline, this writes data to a view in memory
# Use for testing, in a real case you would write to files and/or Kafka
#
# Delete the checkpoint dir if it already exists
# ! rm -r myStreamingCheckPoint1

raw_stream = (input_stream.writeStream 
             .queryName("data_read")
             .outputMode("append")
             .format("memory")
             .option("checkpointLocation", "myStreamingCheckPoint1") 
             .start())

25/09/17 16:25:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [45]:
spark.table("data_read").printSchema()

root
 |-- timestamp: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- value: double (nullable = true)



In [46]:
# Query the output table
# Run this multiple times, as you add csv files with data in the input_path directory

spark.sql("select * from data_read").show()

+---------+----+-----+
|timestamp|name|value|
+---------+----+-----+
+---------+----+-----+



In [47]:
raw_stream.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

## 3. Add csv file in folder `./data/streaming/`

In [52]:
# Read again
spark.sql("select * from data_read").show()

+---------+-------+-----+
|timestamp|   name|value|
+---------+-------+-----+
|     1000| event0|  0.1|
|     1001| event1|  0.2|
|     1002| event2|  0.3|
+---------+-------+-----+



In [73]:
raw_stream.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [72]:
# This maps the input_stream to a temporary view, so that we can work with it using SQL
input_stream.createOrReplaceTempView("input_stream")

In [58]:
# Use Spark SQL to describe the aggregation and tranformation on streaming data
df = spark.sql("""
select name||'_aggregated' as name_aggregated, count(*) as n_points, sum(value) sum_values 
from input_stream 
group by name""")

In [59]:
# Defines another output stream for the pipeline
aggregated_stream = (df.writeStream
                    .queryName("data_aggregated")
                    .outputMode("complete")
                    .format("memory")
                    .option("checkpointLocation", "myStreamingCheckPoint2") 
                    .start())

25/09/17 16:26:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [62]:
# Query the table with aggregated data, this is updated as new data arrives in the input pipeline
spark.sql("select * from data_aggregated").show()

+---------------+--------+----------+
|name_aggregated|n_points|sum_values|
+---------------+--------+----------+
+---------------+--------+----------+



In [65]:
spark.sql("select * from data_read").show()

+---------+-------+-----+
|timestamp|   name|value|
+---------+-------+-----+
|     1000| event0|  0.1|
|     1001| event1|  0.2|
|     1002| event2|  0.3|
+---------+-------+-----+



In [67]:
spark.sql("select * from data_aggregated").show()

+------------------+--------+----------+
|   name_aggregated|n_points|sum_values|
+------------------+--------+----------+
| event1_aggregated|       1|       0.2|
| event2_aggregated|       1|       0.3|
| event0_aggregated|       1|       0.1|
+------------------+--------+----------+



In [39]:
# stop everything
raw_stream.stop()
aggregated_stream.stop()
spark.stop()

25/09/17 16:24:41 WARN DAGScheduler: Failed to cancel job group 37fd2b11-41a6-41c4-90b8-517925545e13. Cannot find active jobs for it.
25/09/17 16:24:41 WARN DAGScheduler: Failed to cancel job group 37fd2b11-41a6-41c4-90b8-517925545e13. Cannot find active jobs for it.
