##### Import Package

In [1]:
from pyspark.sql.types import StringType, StructType, StructField

##### Import Raw Data

In [2]:
# Path to our 20 JSON files
inputPath = "D:/Data/Files/spark/spark-streaming-sample-data/"

##### Set Schema

In [3]:
# Explicitly set schema
schema = StructType([ StructField("time", StringType(), True),
                      StructField("customer", StringType(), True),
                      StructField("action", StringType(), True),
                      StructField("device", StringType(), True)])

##### Read Raw Data

In [4]:
# Create DataFrame representing data in the JSON files
inputDF = (
  spark
    .read
    .schema(schema)
    .json(inputPath)
)

##### Display Data

In [5]:
#display(inputDF)
inputDF.show()

+---------------+--------------------+-----------+--------------------+
|           time|            customer|     action|              device|
+---------------+--------------------+-----------+--------------------+
|           null|                null|       null|                null|
|10:59:46.000 AM|Stafford Blakebrough|   power on|  GreenIQ Controller|
| 6:26:36.000 PM|      Alex Woolcocks|   power on|Nest T3021US Ther...|
| 4:46:28.000 AM|      Clarice Nayshe|   power on|Footbot Air Quali...|
| 8:58:43.000 AM|      Killie Pirozzi|  power off|Footbot Air Quali...|
| 4:20:49.000 PM|    Lynne Dymidowicz|   power on|Footbot Air Quali...|
| 3:41:33.000 AM|       Shaina Dowyer|   power on|             ecobee4|
|10:40:24.000 PM|       Barbee Melato|low battery| August Doorbell Cam|
|11:13:38.000 PM|        Clem Westcot|  power off|Nest T3021US Ther...|
|10:12:15.000 PM|       Kerri Galfour|  power off|         Amazon Echo|
|11:04:41.000 AM|        Trev Ashmore|low battery|  GreenIQ Cont

##### Clean1 : Drop N/A

In [6]:
# Remove empty rows
inputDF = inputDF.dropna()
inputDF.show()

+---------------+--------------------+-----------+--------------------+
|           time|            customer|     action|              device|
+---------------+--------------------+-----------+--------------------+
|10:59:46.000 AM|Stafford Blakebrough|   power on|  GreenIQ Controller|
| 6:26:36.000 PM|      Alex Woolcocks|   power on|Nest T3021US Ther...|
| 4:46:28.000 AM|      Clarice Nayshe|   power on|Footbot Air Quali...|
| 8:58:43.000 AM|      Killie Pirozzi|  power off|Footbot Air Quali...|
| 4:20:49.000 PM|    Lynne Dymidowicz|   power on|Footbot Air Quali...|
| 3:41:33.000 AM|       Shaina Dowyer|   power on|             ecobee4|
|10:40:24.000 PM|       Barbee Melato|low battery| August Doorbell Cam|
|11:13:38.000 PM|        Clem Westcot|  power off|Nest T3021US Ther...|
|10:12:15.000 PM|       Kerri Galfour|  power off|         Amazon Echo|
|11:04:41.000 AM|        Trev Ashmore|low battery|  GreenIQ Controller|
| 3:06:31.000 AM|      Coral Jahnisch|   power on| August Doorbe

##### Create View name "iot_sensor"

In [7]:
# Create temp table named 'iot_sensor'
inputDF.createOrReplaceTempView("iot_sensor")

##### Query Unique Customer Name From "iot_sensor"
##### Set variable as "customerName"

In [8]:
customersName = spark.sql("select distinct(customer) from iot_sensor sort by customer asc ")
customersName.show()

+------------------+
|          customer|
+------------------+
|   Aaren Senescall|
|    Abelard Bliven|
|  Ailbert Laverick|
|    Ailis Killoran|
|  Alphonse Dooland|
|   Ambrosio Rayson|
|     Arther Drever|
|  Ashlee Delacroix|
|Batsheva Olrenshaw|
|     Beulah Leitch|
|      Boyce Lorand|
|   Brewster Currum|
|   Calley Wheildon|
|      Carilyn Rolf|
|       Carl Hughes|
|   Claudia Puttock|
|Constance McGunley|
|       Cordy Pluck|
|  Corinna Fruchter|
|   Courtnay Wilber|
+------------------+
only showing top 20 rows



##### Query number of Customer
##### Set variable as "customerNum"

In [9]:
customersNum = spark.sql("select customer as `Customer Name`,count(customer) as `Number of Customer` from iot_sensor group by 1 order by 2 desc")
customersNum.show()

+--------------------+------------------+
|       Customer Name|Number of Customer|
+--------------------+------------------+
|     Shirleen Kinvig|                 2|
|Cleveland Southerden|                 1|
|     Sinclare Golson|                 1|
|            Fan Gude|                 1|
|   Kiersten O'Fihily|                 1|
|      Yelena Whewell|                 1|
|Adamo Oosthout de...|                 1|
|        Ilise Dufaur|                 1|
|      Dulcie Agutter|                 1|
|          Fan Kenney|                 1|
|          Ryon Rubel|                 1|
|       Corrie Thomas|                 1|
|    Delbert Ferretti|                 1|
|         Lemmie Oake|                 1|
|       Pinchas Isaac|                 1|
|          Car Newlin|                 1|
|         Haven Maase|                 1|
|    Kakalina Lehrian|                 1|
|     Blair Grinikhin|                 1|
|    Sissie Harrowell|                 1|
+--------------------+------------

In [10]:
customersUniq = spark.sql("select count(distinct(customer)) as `Number of Customer` from iot_sensor")
customersUniq.show()

+------------------+
|Number of Customer|
+------------------+
|             19979|
+------------------+



##### Query Device Name 
##### Set variable as "deviceName"

In [11]:
deviceName = spark.sql("select distinct(device) from iot_sensor")
deviceName.show()

+--------------------+
|              device|
+--------------------+
|Nest T3021US Ther...|
|Footbot Air Quali...|
|             ecobee4|
|  GreenIQ Controller|
|         Amazon Echo|
| August Doorbell Cam|
+--------------------+



##### Query number of Device
##### Set variable as "deviceNum"

In [12]:
deviceNum = spark.sql("select device as `Device Name`, count(device) as `Number of Device` from iot_sensor group by device")
deviceNum.show()

+--------------------+----------------+
|         Device Name|Number of Device|
+--------------------+----------------+
|Nest T3021US Ther...|            3222|
|Footbot Air Quali...|            3399|
|             ecobee4|            3175|
|  GreenIQ Controller|            3445|
|         Amazon Echo|            3308|
| August Doorbell Cam|            3431|
+--------------------+----------------+



In [13]:
# Create streaming equivalent of `inputDF` using .readStream
streamingDF = (
  spark
    .readStream
    .schema(schema)
    .option("maxFilesPerTrigger", 1)
    .json(inputPath)
)

In [14]:
# Stream `streamingDF` while aggregating by action
streamingActionCountsDF = (
  streamingDF
    .groupBy(
      streamingDF.action
    )
    .count()
)

In [15]:
# Is `streamingActionCountsDF` actually streaming?
streamingActionCountsDF.isStreaming

True