# Spark Notebook

### Create a Spark Session

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (SparkSession.builder.appName("Activity Tracker").getOrCreate())


24/01/18 12:26:34 WARN Utils: Your hostname, Niharikas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.77 instead (on interface en0)
24/01/18 12:26:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/18 12:26:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/18 12:26:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Source the input

In [6]:
# in Python
static = spark.read.json("../dataflow/transformation/Data/Spark/data/activity-data/")
static.printSchema()
static.show()
dataSchema = static.schema



                                                                                

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|            x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand|  3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand| -0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|    0.0078125|-0.

In [7]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("../dataflow/transformation/Data/Spark/data/activity-data/")

In [10]:
activityCounts = streaming.groupBy("gt").count()

### Partitions

In [11]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

### Write the data

In [12]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

24/01/18 12:29:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/tw/0sj1sbk54zjft6x3s_xgp0r00000gp/T/temporary-03acb6d0-8c1f-4d62-9929-5aa401ea088c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/18 12:29:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [9]:
# activityQuery.awaitTermination()


### Validate the streams

In [13]:
spark.streams.active


[<pyspark.sql.streaming.query.StreamingQuery at 0x113b28b50>]

### Count using SQL

In [14]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

+----------+------+
|        gt| count|
+----------+------+
|       sit|221543|
|     stand|204938|
|stairsdown|168545|
|      walk|238607|
|  stairsup|188165|
|      null|188061|
|      bike|194355|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|283083|
|     stand|261861|
|stairsdown|215342|
|      walk|304887|
|  stairsup|240465|
|      null|240291|
|      bike|248340|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|344623|
|     stand|318781|
|stairsdown|262143|
|      walk|371168|
|  stairsup|292771|
|      null|292518|
|      bike|302327|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|393855|
|     stand|364317|
|stairsdown|299587|
|      walk|424192|
|  stairsup|334621|
|      null|334299|
|      bike|345515|
+----------+------+

+----------+------+
|        gt| count|
+----------+------+
|       sit|455395|
|     stand|421233|
|stairsdown|3463

In [15]:
from pyspark.sql.functions import expr
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()

24/01/18 12:29:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/tw/0sj1sbk54zjft6x3s_xgp0r00000gp/T/temporary-6f56744f-04ff-458e-80f6-e96de8f1eac2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/18 12:29:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [16]:
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

24/01/18 12:29:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/tw/0sj1sbk54zjft6x3s_xgp0r00000gp/T/temporary-a0fe6e65-f7b1-44e4-9c48-e704d9b1bd97. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/18 12:29:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### device count

In [18]:
spark.sql("SELECT * FROM device_counts").show()

+----------+------+--------------------+--------------------+--------------------+
|        gt| model|              avg(x)|              avg(y)|              avg(z)|
+----------+------+--------------------+--------------------+--------------------+
|       sit|  NULL| -5.4943324403959E-4|2.791446281700071E-4|-2.33994461689892...|
|      walk|nexus4|-0.00390116006094...|0.001052508689953...|-6.95435553042998...|
|      walk|  NULL|-0.00390116006094...|0.001052508689953...|-6.95435553042998...|
|  stairsup|  NULL|-0.02479965287771643|-0.00800392344379...|-0.10034088415060415|
|     stand|  NULL|-3.11082189691727...|3.218461665975321...|2.141300040636463...|
|      bike|  NULL|0.022688759550866838|-0.00877912156368...|-0.08251001663412372|
|  stairsup|nexus4|-0.02479965287771643|-0.00800392344379...|-0.10034088415060415|
|      NULL|nexus4|4.796918779024287E-4|-0.00601540958963...|-0.01013356489164...|
|      NULL|  NULL|4.796918779024287E-4|-0.00601540958963...|-0.01013356489164...|
|sta