## Task 0
### Exercise 6 completed by Natalie Benoy
For this assignment, we will perform stream processing using data from an IoT device.
We will use The Heterogeneity Human Activity Recognition (HHAR) dataset from Smartphones and Smartwatches. This is a dataset devised to benchmark human activity recognition algorithms (classification, automatic data segmentation, sensor fusion, feature extraction, etc.) in real-world contexts; specifically, the dataset is gathered with a variety of different device models and usescenarios, in order to reflect sensing heterogeneities to be expected in real deployments.

A subset of this data is already loaded on to DBFS in JSON format at
dbfs:/databricks-datasets/definitive-guide/data/activity-data/

In [0]:
# import statements
import pyspark.sql.functions as F
from pyspark.sql.types import *
from time import sleep

In [0]:
# cleanup code (if needed)
dbutils.fs.ls("dbfs:/local_disk0/tmp")
dbutils.fs.rm("dbfs:/local_disk0/tmp", True)

Out[71]: True

## Task 1

In [0]:
# 1a – View the contents of the folder that contain the data.
a = dbutils.fs.ls("dbfs:/databricks-datasets/definitive-guide/data/activity-data/")
display(a)

path,name,size,modificationTime
dbfs:/databricks-datasets/definitive-guide/data/activity-data/_SUCCESS,_SUCCESS,0,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/_committed_730451297822678341,_committed_730451297822678341,6824,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/_started_730451297822678341,_started_730451297822678341,0,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,14831354,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/part-00001-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,part-00001-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,14831559,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/part-00002-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,part-00002-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,14831751,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/part-00003-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,part-00003-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,14831458,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/part-00004-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,part-00004-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,14831646,1522191985000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/part-00005-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,part-00005-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,14831601,1522191989000
dbfs:/databricks-datasets/definitive-guide/data/activity-data/part-00006-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,part-00006-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json,14831857,1522191989000


In [0]:
# 1b – How many total files are in this directory?
len(a)

Out[5]: 83

In [0]:
# 1c – How many .json files are in this directory?
b = spark.createDataFrame(a)
print(b[b.name.endswith(".json")].count())

80


In [0]:
# 1d – What is the total size of the .json data files?
b.filter(b.name.endswith(".json")).select('size').groupby().sum().show()

+----------+
| sum(size)|
+----------+
|1186523176|
+----------+



## Task 2

In [0]:
# 2a – First read the json files using spark.read.format(‘json’).load(‘path’) or spark.read.json(‘path’) and save the static DataFrame
staticDF = spark.read.format('json').load("dbfs:/databricks-datasets/definitive-guide/data/activity-data/")

In [0]:
# 2b – Obtain the schema of static DF using the schema attribute of DataFrame and save it as dataSchema.
dataSchema = staticDF.schema

In [0]:
# 2c – Print the DataFrame’s schema using the printSchema() method to see a pretty format.
staticDF.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



## Task 3

In [0]:
# 3a – View only the first 15 lines of staticDF DataFrame using display() function
display(staticDF.limit(15))

Arrival_Time,Creation_Time,Device,Index,Model,User,gt,x,y,z
1424686735090,1424686733090638193,nexus4_1,18,nexus4,g,stand,0.0003356934,-0.0005645752,-0.018814087
1424686735292,1424688581345918092,nexus4_2,66,nexus4,g,stand,-0.005722046,0.029083252,0.005569458
1424686735500,1424686733498505625,nexus4_1,99,nexus4,g,stand,0.0078125,-0.017654419,0.010025024
1424686735691,1424688581745026978,nexus4_2,145,nexus4,g,stand,-0.0003814697,0.0184021,-0.013656616
1424686735890,1424688581945252808,nexus4_2,185,nexus4,g,stand,-0.0003814697,-0.031799316,-0.00831604
1424686736094,1424686734097840342,nexus4_1,218,nexus4,g,stand,-0.0007324219,-0.013381958,0.01109314
1424686736294,1424688582347932252,nexus4_2,265,nexus4,g,stand,-0.005722046,0.015197754,0.022659302
1424686736495,1424688582549592408,nexus4_2,305,nexus4,g,stand,-0.0003814697,0.0087890625,0.0034332275
1424686736697,1424688582750703248,nexus4_2,345,nexus4,g,stand,0.002822876,-0.008300781,-0.015792847
1424686736898,1424688582952241334,nexus4_2,385,nexus4,g,stand,0.0006866455,-0.008300781,0.004501343


In [0]:
# 3b – Count the number of observations/rows of the DataFrame using the count() method
staticDF.count()

Out[22]: 6240991

In [0]:
# 3c – Determine if there are any duplicate observations 
staticDF.distinct().count()

Out[23]: 6240991

In [0]:
# 3d – View some summary descriptive stats (mean, stddev, min, max) of all the columns in the staticDF by calling the describe() method
staticDF.describe().show()

+-------+--------------------+--------------------+--------+------------------+-------+-------+-------+--------------------+--------------------+--------------------+
|summary|        Arrival_Time|       Creation_Time|  Device|             Index|  Model|   User|     gt|                   x|                   y|                   z|
+-------+--------------------+--------------------+--------+------------------+-------+-------+-------+--------------------+--------------------+--------------------+
|  count|             6240991|             6240991| 6240991|           6240991|6240991|6240991|6240991|             6240991|             6240991|             6240991|
|   mean|1.424745913733571...|1.424746844881817...|    null|174536.91224919248|   null|   null|   null|4.796918779024442E-4|-0.00601540958963...|-0.01013356489164...|
| stddev|4.3717662371714726E7|4.372975621925285E13|    null|102085.10343046644|   null|   null|   null|   0.452435842800112| 0.45913492830935265|  0.5232216780091502

## Task 4

In [0]:
# 4a – How many different Model-Device combination values are there?
staticDF.groupby('Model', 'Device').count().show()

+------+--------+-------+
| Model|  Device|  count|
+------+--------+-------+
|nexus4|nexus4_1|3091811|
|nexus4|nexus4_2|3149180|
+------+--------+-------+



In [0]:
# 4b – Produce the number of counts for each combination of values in columns ‘Device’ and ‘gt’.
# Print the result in descending sorted order of the column 'count'
staticDF.groupby('Device', 'gt').count().sort('count', ascending = False).show()

+--------+----------+------+
|  Device|        gt| count|
+--------+----------+------+
|nexus4_2|      walk|534222|
|nexus4_1|      walk|526180|
|nexus4_1|       sit|502477|
|nexus4_2|       sit|482237|
|nexus4_2|     stand|466886|
|nexus4_2|      bike|464416|
|nexus4_1|     stand|443897|
|nexus4_1|      null|420991|
|nexus4_1|  stairsup|419926|
|nexus4_2|  stairsup|416672|
|nexus4_2|      null|414734|
|nexus4_1|      bike|399294|
|nexus4_1|stairsdown|379046|
|nexus4_2|stairsdown|370013|
+--------+----------+------+



In [0]:
# 4c – Which combination of ‘Device’ and ‘gt’ columns has the highest counts?
staticDF.groupby('Device', 'gt').count().sort('count', ascending = False).first()

Out[29]: Row(Device='nexus4_2', gt='walk', count=534222)

## Task 5

In [0]:
# 5a – Determine the number of data partitions being used for the DataFrame. 
staticDF.rdd.getNumPartitions()

Out[30]: 12

In [0]:
# 5b – Change the number of partitions used by calling the repartition() method of DataFrames and give it a new value, say 100.
# Then call the getNumPartition() method again to verify the change
staticDF.repartition(100).rdd.getNumPartitions()

staticDF = staticDF.repartition(100)

In [0]:
# 5c – check and set the number of shuffle partitions to 100.
print(spark.conf.get("spark.sql.shuffle.partitions"))
spark.conf.set("spark.sql.shuffle.partitions", 100)
spark.conf.get("spark.sql.shuffle.partitions")

200
Out[7]: '100'

## Task 6

In [0]:
# 6a – Read .json files in one at a time to mimic streaming data
streamingDF = spark.readStream \
 .schema(dataSchema) \
 .option("maxFilesPerTrigger", 1) \
 .json("dbfs:/databricks-datasets/definitive-guide/data/activity-data/*.json")

In [0]:
# 6b – Set up a streaming query to start processing the contents of the stream
streamingQuery = streamingDF \
 .writeStream \
 .queryName('OriginalData') \
 .format("memory") \
 .outputMode("append") \
 .start()

In [0]:
# run this later to stop the stream
streamingQuery.stop()

In [0]:
# 6c – View the output of the streaming query 5 different times
for i in range(0,5):
    spark.sql("SELECT * FROM OriginalData").show()
    sleep(5)

+-------------+-------------------+--------+-----+------+----+-----+-------------+-------------+-------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|            x|            y|            z|
+-------------+-------------------+--------+-----+------+----+-----+-------------+-------------+-------------+
|1424686735175|1424686733176178965|nexus4_1|   35|nexus4|   g|stand| 0.0014038086|    5.0354E-4|-0.0124053955|
|1424686735378|1424686733382813486|nexus4_1|   76|nexus4|   g|stand|-0.0039367676|  0.026138306|  -0.01133728|
|1424686735577|1424686733579072031|nexus4_1|  115|nexus4|   g|stand|  0.003540039| -0.034744263| -0.019882202|
|1424686735779|1424688581834321412|nexus4_2|  163|nexus4|   g|stand|  0.002822876|  0.005584717|  0.017318726|
|1424686735982|1424688582035859498|nexus4_2|  203|nexus4|   g|stand| 0.0017547607| -0.018981934| -0.022201538|
|1424686736186|1424686734188508066|nexus4_1|  236|nexus4|   g|stand| 0.0014038086|  0.010116577|  4.119873E-4|
|

## Task 7

In [0]:
# 7a – Obtain a new DataFrame by specifying a transformation on streamingDF to count the rows.
resultDF = streamingDF.groupby().count()

## Task 8

In [0]:
# 8a – Start streaming query by writing out to the counts of streaming data to memory as output sink.
mystreamingQuery = resultDF.writeStream \
 .queryName('CountData') \
 .format("memory") \
 .outputMode("complete") \
 .start()
# .awaitTermination()

In [0]:
# 8b – Awaiting termination for a streaming query (optional, comment out)
# see .awaitTermination() command above

## Task 9

In [0]:
# 9a – Print out some information about the streaming query while it is running
print(mystreamingQuery.status)
print(mystreamingQuery.id)
print(mystreamingQuery.name)
mystreamingQuery.isActive

{'message': 'Getting offsets from FileStreamSource[dbfs:/databricks-datasets/definitive-guide/data/activity-data/*.json]', 'isDataAvailable': True, 'isTriggerActive': True}
291fa60e-9c9d-4343-a59e-b7a19f6cc926
CountData
Out[51]: True

In [0]:
# 9b – View the streaming output at five different times (5 seconds apart) using a loop
for i in range(0,5):
    spark.sql("SELECT * FROM CountData").show()
    sleep(5)

+-----+
|count|
+-----+
|78011|
+-----+

+------+
| count|
+------+
|234035|
+------+

+------+
| count|
+------+
|312047|
+------+

+------+
| count|
+------+
|390059|
+------+

+------+
| count|
+------+
|546083|
+------+



In [0]:
# 9c – Stop the streaming query 
mystreamingQuery.stop()

## Task 10

In [0]:
# 10a - set up streaming query using groupby() as the aggregation
devActivityCount_groupbyDF = streamingDF.groupby('Device', 'gt').count()

DACQuery1 = devActivityCount_groupbyDF.writeStream \
 .queryName('DAC_groupby') \
 .format("memory") \
 .outputMode("complete") \
 .start()

In [0]:
# View the streaming output at 3 different times using a loop
for i in range(0,3):
    spark.sql("SELECT * FROM DAC_groupby").show()
    sleep(30)

+--------+----------+-----+
|  Device|        gt|count|
+--------+----------+-----+
|nexus4_2|      walk|19922|
|nexus4_1|      bike|14930|
|nexus4_1|      null|15793|
|nexus4_1|       sit|18726|
|nexus4_2|       sit|18203|
|nexus4_2|  stairsup|15553|
|nexus4_1|stairsdown|14293|
|nexus4_1|     stand|16605|
|nexus4_2|      null|15550|
|nexus4_1|  stairsup|15804|
|nexus4_2|      bike|17460|
|nexus4_1|      walk|19846|
|nexus4_2|stairsdown|13801|
|nexus4_2|     stand|17549|
+--------+----------+-----+

+--------+----------+-----+
|  Device|        gt|count|
+--------+----------+-----+
|nexus4_2|      walk|26726|
|nexus4_1|      bike|19936|
|nexus4_1|      null|21106|
|nexus4_1|       sit|25132|
|nexus4_2|       sit|24106|
|nexus4_2|  stairsup|20864|
|nexus4_1|stairsdown|19027|
|nexus4_1|     stand|22244|
|nexus4_2|      null|20685|
|nexus4_1|  stairsup|20945|
|nexus4_2|      bike|23251|
|nexus4_1|      walk|26298|
|nexus4_2|stairsdown|18432|
|nexus4_2|     stand|23295|
+--------+---------

In [0]:
# stop stream
DACQuery1.stop()

In [0]:
# 10b – set up streaming query using cube() as the aggregation
devActivityCount_cubeDF = streamingDF.cube('Device', 'gt').count()

DACQuery2 = devActivityCount_cubeDF.writeStream \
 .queryName('DA_cube') \
 .format("memory") \
 .outputMode("complete") \
 .start()

In [0]:
# View the streaming output at 3 different times using a loop
for i in range(0,3):
    spark.sql("SELECT * FROM DA_cube").show()
    sleep(50)

+--------+----------+------+
|  Device|        gt| count|
+--------+----------+------+
|nexus4_1|      null| 10445|
|    null|      walk| 26512|
|    null|      null|156023|
|nexus4_2|      null| 78640|
|nexus4_1|     stand| 11078|
|nexus4_1|stairsdown|  9496|
|nexus4_1|       sit| 12563|
|    null|      bike| 21593|
|    null|  stairsup| 20905|
|nexus4_1|      walk| 13193|
|    null|       sit| 24619|
|nexus4_1|      null| 77383|
|nexus4_2|      bike| 11555|
|nexus4_2|       sit| 12056|
|    null|     stand| 22769|
|nexus4_1|      bike| 10038|
|nexus4_1|  stairsup| 10570|
|nexus4_2|  stairsup| 10335|
|nexus4_2|stairsdown|  9233|
|    null|stairsdown| 18729|
+--------+----------+------+
only showing top 20 rows

+--------+----------+------+
|  Device|        gt| count|
+--------+----------+------+
|nexus4_1|      null| 21106|
|    null|      walk| 53024|
|    null|      null|312047|
|nexus4_2|      null|157359|
|nexus4_1|     stand| 22244|
|nexus4_1|stairsdown| 19027|
|nexus4_1|       

In [0]:
# stop stream
DACQuery2.stop()

In [0]:
# 10c – set up streaming query using rollup() as the aggregation
devActivityCount_rollupDF = streamingDF.rollup('Device', 'gt').count()

DACQuery3 = devActivityCount_rollupDF.writeStream \
 .queryName('DAC_rollup') \
 .format("memory") \
 .outputMode("complete") \
 .start()

In [0]:
# View the streaming output at 3 different times using a loop
for i in range(0,3):
    spark.sql("SELECT * FROM DAC_rollup").show()
    sleep(50)

+--------+----------+------+
|  Device|        gt| count|
+--------+----------+------+
|nexus4_1|      null| 10445|
|    null|      null|156023|
|nexus4_2|      null| 78640|
|nexus4_1|     stand| 11078|
|nexus4_1|stairsdown|  9496|
|nexus4_1|       sit| 12563|
|nexus4_1|      walk| 13193|
|nexus4_1|      null| 77383|
|nexus4_2|      bike| 11555|
|nexus4_2|       sit| 12056|
|nexus4_1|      bike| 10038|
|nexus4_1|  stairsup| 10570|
|nexus4_2|  stairsup| 10335|
|nexus4_2|stairsdown|  9233|
|nexus4_2|      walk| 13319|
|nexus4_2|     stand| 11691|
|nexus4_2|      null| 10451|
+--------+----------+------+

+--------+----------+------+
|  Device|        gt| count|
+--------+----------+------+
|nexus4_1|      null| 15793|
|    null|      null|234035|
|nexus4_2|      null|118038|
|nexus4_1|     stand| 16605|
|nexus4_1|stairsdown| 14293|
|nexus4_1|       sit| 18726|
|nexus4_1|      walk| 19846|
|nexus4_1|      null|115997|
|nexus4_2|      bike| 17460|
|nexus4_2|       sit| 18203|
|nexus4_1|   

In [0]:
# stop stream
DACQuery3.stop()

## Task 11

In [0]:
# 11a – Calculate the count of rows where the gt column contains ‘stairs’ only for User ‘a’ and ‘b’ on Device = ‘nexus4_1’
stairsDF = streamingDF.filter(streamingDF.gt.like("%stairs%") & (F.col('User') == 'a') | (F.col('User') == 'b') & (F.col('Device') == 'nexus4_1')).groupby('User').count()

stairsonlyDF = stairsDF.writeStream \
 .queryName('stairsonly') \
 .format("memory") \
 .outputMode("update") \
 .start()

In [0]:
# 11b – View the streaming output at 3 different times using a loop
for i in range(0,3):
    spark.sql("SELECT * FROM stairsonly").show()
    sleep(20)

+----+-----+
|User|count|
+----+-----+
|   b| 4533|
|   a| 2126|
|   b| 9083|
|   a| 4252|
+----+-----+

+----+-----+
|User|count|
+----+-----+
|   b| 4533|
|   a| 2126|
|   b| 9083|
|   a| 4252|
|   b|13647|
|   a| 6378|
|   b|18116|
|   a| 8504|
+----+-----+

+----+-----+
|User|count|
+----+-----+
|   b| 4533|
|   a| 2126|
|   b| 9083|
|   a| 4252|
|   b|13647|
|   a| 6378|
|   b|18116|
|   a| 8504|
|   b|22608|
|   a|10630|
|   b|27155|
|   a|12756|
|   b|31760|
|   a|14882|
+----+-----+



In [0]:
# 11c – stop stream
stairsonlyDF.stop()

## Task 12

In [0]:
# 12a – Compute the average of columns ‘x’, ‘y’, ‘z’ for each combination of values for ‘Model’ and ‘gt’ columns
statsDF = streamingDF.groupby('gt', 'Model').agg(F.mean('x').alias('x_avg'), F.mean('y').alias('y_avg'), F.mean('z').alias('z_avg'), F.count('Device').alias('devicecount'))

gtmodelstatsQuery = statsDF.writeStream \
 .queryName('statsQuery') \
 .format("memory") \
 .outputMode("complete") \
 .start()

In [0]:
# 12b – View the streaming output where x_avg > 0 at 3 different times using a loop
for i in range(0,3):
    spark.sql("SELECT * FROM statsQuery WHERE x_avg > 0").show()
    sleep(20)

+----------+------+--------------------+--------------------+--------------------+-----------+
|        gt| Model|               x_avg|               y_avg|               z_avg|devicecount|
+----------+------+--------------------+--------------------+--------------------+-----------+
|      bike|nexus4|0.020961064903431637|-0.00915222693840...|-0.08538726699652664|      21593|
|stairsdown|nexus4|0.025383337512333622| -0.0362580763332372| 0.12712694345857248|      18729|
+----------+------+--------------------+--------------------+--------------------+-----------+

+----------+------+--------------------+--------------------+--------------------+-----------+
|        gt| Model|               x_avg|               y_avg|               z_avg|devicecount|
+----------+------+--------------------+--------------------+--------------------+-----------+
|      bike|nexus4|0.022771987227411025| -0.0093323141311089|-0.08289858777656002|      43187|
|stairsdown|nexus4|  0.0245728458283162|-0.036680

In [0]:
# 12c – stop stream
gtmodelstatsQuery.stop()

## Task 13

In [0]:
# 13a – compute the historical averages of the sensor orientation columns ‘x’, ‘y’, ‘z’ calculated by grouping the columns ‘gt’ and ‘Model’ 
historicalAvg = staticDF.groupby('gt', 'Model').agg(F.mean('x').alias('x_avg1'), F.mean('y').alias('y_avg1'), F.mean('z').alias('z_avg1'))

In [0]:
# 13b – join the average values of the sensor orientation columns ‘x’, ‘y’, ‘z’ calculated by grouping the columns ‘gt’ and ‘Model’ for historical and streaming data, run the stream
joinStaticStreamingQuery = streamingDF.groupby('gt', 'Model').agg(F.mean('x').alias('x_avg2'), F.mean('y').alias('y_avg2'), F.mean('z').alias('z_avg2'))\
.join(historicalAvg, on = ['gt', 'Model'], how = 'inner')\
.writeStream\
.queryName('joinQuery')\
.format("memory")\
.outputMode("complete")\
.start()

In [0]:
# 13c – Display the streaming output 3 times.
for i in range(0,3):
    spark.sql("SELECT * FROM joinQuery").show()
    sleep(90)

+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        gt| Model|              x_avg2|              y_avg2|              z_avg2|              x_avg1|              y_avg1|              z_avg1|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     stand|nexus4|-3.19865608911239...|4.070066943871052E-4|1.027885098598942...|-3.11082189691726...| 3.21846166597532E-4|2.141300040636481...|
|      bike|nexus4|0.020961064903431637|-0.00915222693840...|-0.08538726699652664| 0.02268875955086682|-0.00877912156368...|-0.08251001663412376|
|      walk|nexus4|-0.00371092404649...|0.003353258755646...|0.001107451361787...|-0.00390116006094...|0.001052508689953...|-6.95435553042996...|
|stairsdown|nexus4|0.025383337512333622| -0.0362580763332372| 0.12712694345857248|0.021613908669165297|-0.03249018824752619|

In [0]:
# 13d – stop stream
joinStaticStreamingQuery.stop()

In [0]:
# 13e

# the second set of averages (x_avg2, y_avg2, etc.) change over time because they are calculated from the streaming dataset, whereas the first set of averages (x_avg1, y_avg2, etc.) are historical.