# Initialization

In [36]:
import pyspark
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [37]:
spark

# Data Load

In [32]:
!ls /resources/data/activity-data/ | head -10

_committed_730451297822678341
part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00001-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00002-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00003-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00004-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00005-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00006-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00007-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00008-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json


In [4]:
static = spark.read.json('/resources/data/activity-data/')
dataSchema = static.schema
dataSchema

StructType([StructField('Arrival_Time', LongType(), True), StructField('Creation_Time', LongType(), True), StructField('Device', StringType(), True), StructField('Index', LongType(), True), StructField('Model', StringType(), True), StructField('User', StringType(), True), StructField('gt', StringType(), True), StructField('x', DoubleType(), True), StructField('y', DoubleType(), True), StructField('z', DoubleType(), True)])

In [5]:
static.show(5)

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|   0.0078125|-0.017654419| 0.010025024|
|1424686735691|1424688581745026978|nexus4_2|  145|nexus4|   g|stand|-3.814697E-4|   0.0184021|-0.013656616|
|1424686735890|1424688581945252808|nexus4_2|  185|nexus4|   g|stand|-3.814697E-4|-0.031799316| -0.00831604|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
only showing top 5 rows



Metadata for the dataset

| Column | Description |
| --- | ----------- |
| Index         |  The row number.
| Arrival_Time  |  The time the measurement arrived to the sensing application
| Creation_Time |  The timestamp the OS attaches to the sample
| X,Y,Z | The values provided by the sensor for the three axes, X,y,z
| User          |  The user this sample originates from, the users are named a to i.
| Model         |  The phone/watch model this sample originates from
| Device        |  The specific device this sample is from. They are prefixed with the model name and then the number, e.g., nexus4_1 or nexus4_2.
| Gt            |  The activity the user was performing: bike sit, stand, walk, stairsup, stairsdown and null

# Structured Streaming

### Mock File Streaming (Throttle)

In [7]:
spark.conf.set('spark.sql.shuffle.partitions', 5)
streaming = (
    spark
    .readStream
    .schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .json('/resources/data/activity-data/')
)
streaming.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



# Windowing

### Window on Event Time

In [8]:
withEventTime = streaming.selectExpr(
    "*",
    "cast(cast(Creation_time as double)/1000000000 as timestamp) as event_time"
)


### Tumbling Window

In [9]:
from pyspark.sql.functions import window, col

(
    withEventTime
    .groupBy(window(col('event_time'), '10 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_window')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f5189c0f7c0>

In [10]:
spark.sql("SELECT * FROM pyevents_per_window").show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-24 11:50...| 9356|
|{2015-02-24 13:00...| 8322|
|{2015-02-23 12:30...| 6266|
|{2015-02-23 10:20...| 6167|
|{2015-02-24 12:30...| 7812|
+--------------------+-----+
only showing top 5 rows



In [11]:
spark.sql("SELECT * FROM pyevents_per_window").printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



### Sliding Windows

In [12]:
from pyspark.sql.functions import window, col

(
    withEventTime
    .groupBy(window(col('event_time'), '10 minutes', '5 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_slidingWindow')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f51c63d9240>

In [13]:
spark.sql('SELECT * FROM pyevents_per_slidingWindow').show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-23 14:15...| 2736|
|{2015-02-24 11:50...| 3762|
|{2015-02-24 13:00...| 3383|
|{2015-02-22 00:35...|    1|
|{2015-02-23 12:30...| 2545|
+--------------------+-----+
only showing top 5 rows



In [14]:
spark.sql("SELECT * FROM pyevents_per_slidingWindow").printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



# Watermarks

In [15]:
(
    withEventTime
    .withWatermark('event_time', '30 minutes')
    .groupBy(window(col('event_time'), '10 minutes', '5 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_window2')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f518a0ec3d0>

In [17]:
spark.sql('SELECT * FROM pyevents_per_window2').show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-23 14:15...| 2736|
|{2015-02-24 11:50...| 3762|
|{2015-02-24 13:00...| 3383|
|{2015-02-22 00:35...|    1|
|{2015-02-23 12:30...| 2545|
+--------------------+-----+
only showing top 5 rows



# Drop Duplicates

In [18]:
(
    withEventTime
    .withWatermark('event_time', '5 seconds')
    .dropDuplicates(['User', 'event_time'])
    .groupBy('User')
    .count()
    .writeStream
    .queryName('pydeduplicated')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f518a00ed70>

In [26]:
spark.sql('SELECT * FROM pydeduplicated').show(5)

+----+-----+
|User|count|
+----+-----+
|   a| 8085|
|   b| 9123|
|   c| 7715|
|   g| 9167|
|   h| 7733|
+----+-----+
only showing top 5 rows



# Checkpoints

In [39]:
static = spark.read.json('/resources/data/activity-data/')

streaming = (
    spark
    .readStream
    .schema(static.schema)
    .option('maxFilesPerTrigger', 10)
    .json('/resources/data/activity-data/')
    .groupBy('gt')
    .count()
)


query = (
    streaming
    .writeStream
    .outputMode('complete')
    .option('checkpointlocation', '/resources/logs')
    .queryName('test_python_stream')
    .format('memory')
    .start()
)

for x in range(10):
    spark.sql("SELECT * FROM test_python_stream").show()

IllegalArgumentException: Cannot start query with name test_python_stream as a query with that name is already active in this SparkSession

In [41]:
spark.stop()