## ICRI Envirosensor Data Analysis

#### 1. Download dependencies

In [1]:
#Import json
import json

#Import ijson to read large json files iteratively
import ijson

#Import Time module
import time

#Import Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql import *

from pyspark.sql import functions

#Import Pandas for data analysis 
import pandas as pd
from pandas import DataFrame
from pandas.io.json import json_normalize

#### 2. Identify data location

In [2]:
from os import path

#In windows the 'r' preceding the file name string indcates that it is a raw sting so that slashes are interpreted correctly
#filename = path.expanduser(r'data\here_east_envirosensors.json')

#Test file
filename = path.expanduser(r'data\Envirosensor_TEST.json')

#Check that the name has been assigned to the variable correctly
print(filename)

data\Envirosensor_TEST.json


#### 3. Check file size in KB to ensure it can be loaded into memory

In [3]:
path.getsize(filename) / (1<<10)

5.390625

## Parse JSON

#### 4. Parse JSON data iteratively with ijson

In [4]:
#Define function to iteratively parse json file
def iterativeParse(json):
    with open(json, 'r') as json_file:
        #Use the items method in ijson to extract a list of objects specifying the file and key path to to the list
        objects = ijson.items(json_file, 'item.data')
        #The items fuction returns a generator which we turn into a list of payloads with the list function 
        parsedData = list(objects)

    return parsedData

#Time execution of the iterativeParse function and assign the result to payloads variable
start = time.time()
payloads = iterativeParse(filename)
end = time.time()

print(f'\nTime to complete: {end - start:.2f}s\n')

#Check the first item in the payloads list
print(payloads[0])


Time to complete: 0.03s

{ "DeviceID": "8010", "DeviceType": "Envirosensor", "Event": "event", "Time": "2018-06-03 20:40:41.629620", "Data": { "TMP": "36.187", "OPT": "4.17", "BAT": "36.58", "HDT": "36.32", "BAR": "1016.17", "HDH": "22.65" } }


#### 5. Display number of payloads

In [5]:
#Count items in the payloads list
print('Total Sensor Payloads =', len(payloads))

Total Sensor Payloads = 11


## Pandas Dataframe

#### 6. Loop through payloads list and add each to an empty Pandas dataframe

In [6]:
#Create an empty dataframe
df = DataFrame()

#Define a function to loop through the collection of payloads and add each one to the dataframe
def normaliseDataFrame(x):
    tempdf = DataFrame()
    for i in x:
        row = json_normalize(json.loads(i))
        tempdf = tempdf.append(row, sort=False)
        
        #Display individual rows that are added to the dataframe for debugging
        #print(row)
        
    return tempdf

#Time execution of the normaliseDataFrame function and assign the result to payloads variable
start = time.time()
df = normaliseDataFrame(payloads)
end = time.time()

print(f'\nTime to complete: {end - start:.2f}s\n')
    


Time to complete: 0.03s



#### 7. Display dataframe summary information

In [7]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 11 entries, 0 to 0
Data columns (total 10 columns):
Data.BAR      11 non-null object
Data.BAT      11 non-null object
Data.HDH      11 non-null object
Data.HDT      11 non-null object
Data.OPT      11 non-null object
Data.TMP      11 non-null object
DeviceID      11 non-null object
DeviceType    11 non-null object
Event         11 non-null object
Time          11 non-null object
dtypes: object(10)
memory usage: 968.0+ bytes


#### 8. Display dataframe

In [8]:
df

Unnamed: 0,Data.BAR,Data.BAT,Data.HDH,Data.HDT,Data.OPT,Data.TMP,DeviceID,DeviceType,Event,Time
0,1016.17,36.58,22.65,36.32,4.17,36.187,8010,Envirosensor,event,2018-06-03 20:40:41.629620
0,1016.14,35.06,28.68,34.76,0.0,34.562,8017,Envirosensor,event,2018-06-01 04:22:22.881564
0,1017.23,35.49,26.23,35.06,6.17,34.781,8008,Envirosensor,event,2018-06-01 13:04:56.152669
0,1013.88,34.07,21.58,34.14,73.04,33.843,8019,Envirosensor,event,2018-05-29 22:48:18.852665
0,1017.59,35.05,27.41,34.77,6.67,34.625,8010,Envirosensor,event,2018-06-01 13:19:15.175496
0,1017.23,36.19,18.9,36.06,4.89,35.906,8004,Envirosensor,event,2018-06-02 23:12:12.230449
0,1018.17,36.51,21.66,36.07,8.5,35.812,8008,Envirosensor,event,2018-06-03 07:33:16.774434
0,1013.19,34.93,20.34,35.01,74.0,34.718,8019,Envirosensor,event,2018-05-30 09:25:24.052756
0,1018.45,35.87,22.71,35.52,108.72,35.281,8005,Envirosensor,event,2018-06-03 00:30:56.678239
0,1018.45,37.08,17.05,36.84,0.24,36.5,8018,Envirosensor,event,2018-06-03 09:34:58.436492


## Apache Spark

#### Setup Apache Spark

In [9]:
#Create a Spark session
spark = SparkSession \
    .builder \
    .appName("Envirosensor Data Analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Parse JSON Directly into Dataframe

#### Read JSON data directly into Apache Spark data frame

In [10]:
#Place JSON into a Spark dataframe and time execution
start = time.time()
df = spark.read.json(filename, multiLine=True)
end = time.time()

print(f'\nTime to complete: {end - start:.2f}s\n')


Time to complete: 1.69s



#### Display the dataframe

In [11]:
# Display the content of the DataFrame
df.show()

+--------------------+--------+------------+---------+------+--------------------+--------------------+
|                data|deviceId|  deviceType|eventType|format|    json_featuretype|           timestamp|
+--------------------+--------+------------+---------+------+--------------------+--------------------+
|{ "DeviceID": "80...|    8010|Envirosensor|    event|  json|iotp_kfb22t_envir...|2018-06-03T20:40:...|
|{ "DeviceID": "80...|    8017|Envirosensor|    event|  json|iotp_kfb22t_envir...|2018-06-01T04:22:...|
|{ "DeviceID": "80...|    8008|Envirosensor|    event|  json|iotp_kfb22t_envir...|2018-06-01T13:04:...|
|{ "DeviceID": "80...|    8019|Envirosensor|    event|  json|iotp_kfb22t_envir...|2018-05-29T22:48:...|
|{ "DeviceID": "80...|    8010|Envirosensor|    event|  json|iotp_kfb22t_envir...|2018-06-01T13:19:...|
|{ "DeviceID": "80...|    8004|Envirosensor|    event|  json|iotp_kfb22t_envir...|2018-06-02T23:12:...|
|{ "DeviceID": "80...|    8008|Envirosensor|    event|  json|iot

## Spark Dataframe

#### Establish sensor data payload schema by loading a sample into a data frame

In [12]:
payloadSchema = spark.createDataFrame(json_normalize(json.loads(payloads[0])))
payloadSchema.printSchema()
payloadSchema.schema

root
 |-- Data.BAR: string (nullable = true)
 |-- Data.BAT: string (nullable = true)
 |-- Data.HDH: string (nullable = true)
 |-- Data.HDT: string (nullable = true)
 |-- Data.OPT: string (nullable = true)
 |-- Data.TMP: string (nullable = true)
 |-- DeviceID: string (nullable = true)
 |-- DeviceType: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Time: string (nullable = true)



StructType(List(StructField(Data.BAR,StringType,true),StructField(Data.BAT,StringType,true),StructField(Data.HDH,StringType,true),StructField(Data.HDT,StringType,true),StructField(Data.OPT,StringType,true),StructField(Data.TMP,StringType,true),StructField(DeviceID,StringType,true),StructField(DeviceType,StringType,true),StructField(Event,StringType,true),StructField(Time,StringType,true)))

#### Import relevant data types from pyspark

In [13]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

#### Specify data schema

In [14]:
schema = StructType([
    StructField('Data.BAR', StringType(), True),
    StructField('Data.BAT', StringType(), True),
    StructField('Data.HDH', StringType(), True),
    StructField('Data.HDT', StringType(), True),
    StructField('Data.OPT', StringType(), True),
    StructField('Data.TMP', StringType(), True),
    StructField('DeviceID', StringType(), True),
    StructField('DeviceType', StringType(), True),
    StructField('Event', StringType(), True),
    StructField('Time', TimestampType(), True)])

#### Loop through payloads list and add each to an empty Spark dataframe

In [15]:
#Define a function to loop through the collection of payloads and add each one to the dataframe
def normaliseSparkDataFrame(x):    
    #Read first payload element and use to create dataframe
    tempdf = spark.createDataFrame(json_normalize(json.loads(payloads[0])))
    #Skip the first element and loop through the rest adding each to the dataframe
    for i in x [1:]:
        row = json_normalize(json.loads(i))
        tempdf = tempdf.union(spark.createDataFrame(row))
        
        #Display individual rows that are added to the dataframe for debugging
        #print(row)
    
    return tempdf

#Time execution of the normaliseDataFrame function and assign the result to payloads variable
start = time.time()
df = normaliseSparkDataFrame(payloads)
end = time.time()

print(f'\nTime to complete: {end - start:.2f}s\n')



Time to complete: 0.28s



#### Display the Dataframe

In [16]:
# Display the content of the DataFrame
df.show()

+--------+--------+--------+--------+--------+--------+--------+------------+-----+--------------------+
|Data.BAR|Data.BAT|Data.HDH|Data.HDT|Data.OPT|Data.TMP|DeviceID|  DeviceType|Event|                Time|
+--------+--------+--------+--------+--------+--------+--------+------------+-----+--------------------+
| 1016.17|   36.58|   22.65|   36.32|    4.17|  36.187|    8010|Envirosensor|event|2018-06-03 20:40:...|
| 1016.14|   35.06|   28.68|   34.76|    0.00|  34.562|    8017|Envirosensor|event|2018-06-01 04:22:...|
| 1017.23|   35.49|   26.23|   35.06|    6.17|  34.781|    8008|Envirosensor|event|2018-06-01 13:04:...|
| 1013.88|   34.07|   21.58|   34.14|   73.04|  33.843|    8019|Envirosensor|event|2018-05-29 22:48:...|
| 1017.59|   35.05|   27.41|   34.77|    6.67|  34.625|    8010|Envirosensor|event|2018-06-01 13:19:...|
| 1017.23|   36.19|   18.90|   36.06|    4.89|  35.906|    8004|Envirosensor|event|2018-06-02 23:12:...|
| 1018.17|   36.51|   21.66|   36.07|    8.50|  35.812|

## Spark Resilient Distributed Datasets (RDD's)

#### Load payloads into RDD's

In [17]:
#Time execution of loading payloads into RDD's
start = time.time()
sc = spark.sparkContext
payloadsRDD = sc.parallelize(payloads)
end = time.time()

print(f'\nTime to complete: {end - start:.2f}s\n')


Time to complete: 0.01s



#### Convert RDD to dataframe

In [18]:
processedData = spark.read.json(payloadsRDD)

#### Display the Dataframe

In [19]:
# Display the content of the DataFrame
processedData.show()

+--------------------+--------+------------+-----+--------------------+
|                Data|DeviceID|  DeviceType|Event|                Time|
+--------------------+--------+------------+-----+--------------------+
|[1016.17, 36.58, ...|    8010|Envirosensor|event|2018-06-03 20:40:...|
|[1016.14, 35.06, ...|    8017|Envirosensor|event|2018-06-01 04:22:...|
|[1017.23, 35.49, ...|    8008|Envirosensor|event|2018-06-01 13:04:...|
|[1013.88, 34.07, ...|    8019|Envirosensor|event|2018-05-29 22:48:...|
|[1017.59, 35.05, ...|    8010|Envirosensor|event|2018-06-01 13:19:...|
|[1017.23, 36.19, ...|    8004|Envirosensor|event|2018-06-02 23:12:...|
|[1018.17, 36.51, ...|    8008|Envirosensor|event|2018-06-03 07:33:...|
|[1013.19, 34.93, ...|    8019|Envirosensor|event|2018-05-30 09:25:...|
|[1018.45, 35.87, ...|    8005|Envirosensor|event|2018-06-03 00:30:...|
|[1018.45, 37.08, ...|    8018|Envirosensor|event|2018-06-03 09:34:...|
|[1018.69, 36.04, ...|    8016|Envirosensor|event|2018-06-03 07:

In [20]:
processedData.show(1,False)

+--------------------------------------------+--------+------------+-----+--------------------------+
|Data                                        |DeviceID|DeviceType  |Event|Time                      |
+--------------------------------------------+--------+------------+-----+--------------------------+
|[1016.17, 36.58, 22.65, 36.32, 4.17, 36.187]|8010    |Envirosensor|event|2018-06-03 20:40:41.629620|
+--------------------------------------------+--------+------------+-----+--------------------------+
only showing top 1 row



In [21]:
processedData.select('Data').show(5, False)

+---------------------------------------------+
|Data                                         |
+---------------------------------------------+
|[1016.17, 36.58, 22.65, 36.32, 4.17, 36.187] |
|[1016.14, 35.06, 28.68, 34.76, 0.00, 34.562] |
|[1017.23, 35.49, 26.23, 35.06, 6.17, 34.781] |
|[1013.88, 34.07, 21.58, 34.14, 73.04, 33.843]|
|[1017.59, 35.05, 27.41, 34.77, 6.67, 34.625] |
+---------------------------------------------+
only showing top 5 rows



In [61]:
splitCol = functions.split(processedData['Data'],',')
processedData = processedData.withColumn('NAME1', splitCol.getItem(0))
processedData = processedData.withColumn('NAME1', splitCol.getItem(0))
processedData.show()

AnalysisException: "cannot resolve 'split(`Data`, ',')' due to data type mismatch: argument 1 requires string type, however, '`Data`' is of struct<BAR:string,BAT:string,HDH:string,HDT:string,OPT:string,TMP:string> type.;;\n'Project [Data#2992, DeviceID#2993, DeviceType#2994, Event#2995, Time#2996, split(Data#2992, ,)[0] AS NAME1#3034]\n+- LogicalRDD [Data#2992, DeviceID#2993, DeviceType#2994, Event#2995, Time#2996], false\n"

In [37]:
processedData.printSchema()
processedData.schema

root
 |-- Data: struct (nullable = true)
 |    |-- BAR: string (nullable = true)
 |    |-- BAT: string (nullable = true)
 |    |-- HDH: string (nullable = true)
 |    |-- HDT: string (nullable = true)
 |    |-- OPT: string (nullable = true)
 |    |-- TMP: string (nullable = true)
 |-- DeviceID: string (nullable = true)
 |-- DeviceType: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Time: string (nullable = true)



StructType(List(StructField(Data,StructType(List(StructField(BAR,StringType,true),StructField(BAT,StringType,true),StructField(HDH,StringType,true),StructField(HDT,StringType,true),StructField(OPT,StringType,true),StructField(TMP,StringType,true))),true),StructField(DeviceID,StringType,true),StructField(DeviceType,StringType,true),StructField(Event,StringType,true),StructField(Time,StringType,true)))