## How to Process IoT Device JSON Data Using Dataset

####Reading JSON as a Dataset

Use the Scala case class *DeviceIoTData* to convert the JSON device data into a dataframe. There is GeoIP information for each device entry:
* IP address
* ISO-3166-1 two and three letter codes
* Country Name
* Latitude and longitude

With these attributes as part of the device data, we can map and visualize them as needed. For each IP associated with a *device_id*, I optained the above attributes from a webservice at http://freegeoip.net/csv/ip

*{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude": 53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21, "humidity": 65, "battery_level": 8, "c02_level": 1408, "lcd": "red", "timestamp" :1458081226051 }*

This dataset is avaialbe from Public S3 bucket //databricks-public-datasets/data/iot or https://github.com/dmatrix/examples/blob/master/spark/databricks/notebooks/py/data/iot_devices.json

In [0]:
%fs ls "FileStore/tables/"

path,name,size
dbfs:/FileStore/tables/iot_devices.json,iot_devices.json,62509052


In [0]:
# read the json file and create the dataframe


file_location = "/FileStore/tables/iot_devices.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)



battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


In [0]:
# Create a view or table

temp_table_name = "iot_devices_json"

df.createOrReplaceTempView(temp_table_name)

In [0]:
df.count()

Out[661]: 198164

Displaying your Dataset

In [0]:
display(df)

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


#### Data exploration

Top five entries

In [0]:
df.take(5)

Out[663]: [Row(battery_level=8, c02_level=868, cca2='US', cca3='USA', cn='United States', device_id=1, device_name='meter-gauge-1xbYRYcj', humidity=51, ip='68.161.225.1', latitude=38.0, lcd='green', longitude=-97.0, scale='Celsius', temp=34, timestamp=1458444054093),
 Row(battery_level=7, c02_level=1473, cca2='NO', cca3='NOR', cn='Norway', device_id=2, device_name='sensor-pad-2n2Pea', humidity=70, ip='213.161.254.1', latitude=62.47, lcd='red', longitude=6.15, scale='Celsius', temp=11, timestamp=1458444054119),
 Row(battery_level=2, c02_level=1556, cca2='IT', cca3='ITA', cn='Italy', device_id=3, device_name='device-mac-36TWSKiT', humidity=44, ip='88.36.5.1', latitude=42.83, lcd='red', longitude=12.83, scale='Celsius', temp=19, timestamp=1458444054120),
 Row(battery_level=6, c02_level=1080, cca2='US', cca3='USA', cn='United States', device_id=4, device_name='sensor-pad-4mzWkz', humidity=32, ip='66.39.173.154', latitude=44.06, lcd='yellow', longitude=-121.32, scale='Celsius', temp=28, tim

For all relational expressions, the [Catalyst Optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html) will formulate an optimized logical and physical plan for execution, and [Tungsten](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html) engine will optimize the generated code. For our *DeviceIoTData*, it will use its standard encoders to optimize its binary internal representation, hence decrease the size of generated code, minimize the bytes transfered over the networks between nodes, and execute faster.

For instance, let's first filter the device dataset on *temp* and *humidity* attributes with a predicate and display the first 10 items.

In [0]:
# issue select, map, filter operations on the dataframes

from pyspark.sql.functions import col, asc
TempFilter = df.filter(col("temp") > 30).filter(col("humidity") > 70)
display(TempFilter)


battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
0,1466,US,USA,United States,17,meter-gauge-17zb8Fghhl,98,161.188.212.254,39.95,red,-75.16,Celsius,31,1458444054129
9,986,FR,FRA,France,48,sensor-pad-48jt4eL,97,90.37.208.1,43.88,green,4.9,Celsius,31,1458444054151
8,1436,US,USA,United States,54,sensor-pad-5410CWPrNb6,73,204.15.64.249,32.89,red,-117.13,Celsius,34,1458444054155
4,1090,US,USA,United States,63,device-mac-63GL4xSaZbj,91,66.198.198.1,44.56,yellow,-105.67,Celsius,31,1458444054162
4,1072,PH,PHL,Philippines,81,device-mac-81nsKomrRe,90,222.127.71.1,14.55,yellow,121.04,Celsius,31,1458444054172
3,1076,FR,FRA,France,82,sensor-pad-82HJm6yP,76,213.162.50.33,48.86,yellow,2.35,Celsius,32,1458444054172
9,1221,DE,DEU,Germany,83,meter-gauge-83lLWufdrzWE,96,62.214.32.222,51.0,yellow,9.0,Celsius,31,1458444054173
2,1182,US,USA,United States,108,sensor-pad-108NG6gl2jPi,82,208.35.184.254,34.2,yellow,-118.82,Celsius,34,1458444054187
6,852,US,USA,United States,109,meter-gauge-109PooBS,80,24.29.148.73,38.0,green,-97.0,Celsius,32,1458444054188
4,1188,DK,DNK,Denmark,144,sensor-pad-144T0J4k,87,212.242.41.50,55.68,yellow,12.57,Celsius,31,1458444054211


Use filter to filter out dataframe rows that met the temperature and humidity predicate

In [0]:
# filter out rows that meet the temperature and humimdity predicate
TempFilter10 = df.filter(col("temp") > 30).filter(col("humidity") > 70).take(10)
TempFilter10

Out[665]: [Row(battery_level=0, c02_level=1466, cca2='US', cca3='USA', cn='United States', device_id=17, device_name='meter-gauge-17zb8Fghhl', humidity=98, ip='161.188.212.254', latitude=39.95, lcd='red', longitude=-75.16, scale='Celsius', temp=31, timestamp=1458444054129),
 Row(battery_level=9, c02_level=986, cca2='FR', cca3='FRA', cn='France', device_id=48, device_name='sensor-pad-48jt4eL', humidity=97, ip='90.37.208.1', latitude=43.88, lcd='green', longitude=4.9, scale='Celsius', temp=31, timestamp=1458444054151),
 Row(battery_level=8, c02_level=1436, cca2='US', cca3='USA', cn='United States', device_id=54, device_name='sensor-pad-5410CWPrNb6', humidity=73, ip='204.15.64.249', latitude=32.89, lcd='red', longitude=-117.13, scale='Celsius', temp=34, timestamp=1458444054155),
 Row(battery_level=4, c02_level=1090, cca2='US', cca3='USA', cn='United States', device_id=63, device_name='device-mac-63GL4xSaZbj', humidity=91, ip='66.198.198.1', latitude=44.56, lcd='yellow', longitude=-105.67,

In [0]:
# display(TempFilter10.toDF())

In [0]:
# Mapping four fields- temp, device_name, device_id, cca3 
dfTempMap = df.where((col("temp") > 25)).rdd.map(lambda d: (d.temp, d.device_name, d.device_id, d.cca3))
dfTempMap

Out[667]: PythonRDD[4718] at RDD at PythonRDD.scala:58

In [0]:
display(dfTempMap.toDF())

_1,_2,_3,_4
34,meter-gauge-1xbYRYcj,1,USA
28,sensor-pad-4mzWkz,4,USA
27,sensor-pad-6al7RTAobR,6,USA
27,sensor-pad-8xUD6pzsQI,8,JPN
26,sensor-pad-10BsywSYUF,10,USA
31,meter-gauge-17zb8Fghhl,17,USA
31,sensor-pad-18XULN9Xv,18,CHN
29,meter-gauge-19eg1BpfCO,19,USA
30,device-mac-21sjz5h,21,AUT
28,sensor-pad-24PytzD00Cp,24,CAN


Now use the filter() method that is equivalent as the where() method used above.

In [0]:
dfTemp25 = df.filter(col("temp") > 25).rdd.map(lambda d: (d.temp, d.device_name, d.device_id, d.cca3))

display(dfTemp25.toDF())

_1,_2,_3,_4
34,meter-gauge-1xbYRYcj,1,USA
28,sensor-pad-4mzWkz,4,USA
27,sensor-pad-6al7RTAobR,6,USA
27,sensor-pad-8xUD6pzsQI,8,JPN
26,sensor-pad-10BsywSYUF,10,USA
31,meter-gauge-17zb8Fghhl,17,USA
31,sensor-pad-18XULN9Xv,18,CHN
29,meter-gauge-19eg1BpfCO,19,USA
30,device-mac-21sjz5h,21,AUT
28,sensor-pad-24PytzD00Cp,24,CAN


select() where battery_level is greater than 6, sort in asceding order on C02_level.

In [0]:
display(df.select("battery_level", "c02_level", "device_name").where(col("battery_level") > 6).sort(col("c02_level")))

battery_level,c02_level,device_name
7,800,device-mac-155337F0m78fHJKl
9,800,meter-gauge-99539mTrCQUvNdX
7,800,sensor-pad-218641sokniqN
8,800,meter-gauge-98761Rxrdzc5
8,800,meter-gauge-512894c2QTl
8,800,sensor-pad-75450LgmIGmUA
8,800,sensor-pad-5102ZPSPa4Olg
8,800,sensor-pad-131258au5Zw7o
8,800,sensor-pad-17905617CdtV7
8,800,meter-gauge-70190voazFbC


Let's see how to use groupBy() and avg(). 
Let's take all temperatures readings > 25, along with their corresponding devices' humidity, groupBy ccca3 country code, and compute averages. Plot the resulting Dataset.

In [0]:
from pyspark.sql.functions import avg

dfAvgTmp = df.filter(col("temp") > 25).rdd.map(lambda d: (d.temp, d.humidity, d.cca3)).toDF().groupBy("_3").agg(avg("_1"), avg("_2"))


display(dfAvgTmp)


_3,avg(_1),avg(_2)
PSE,30.88888888888889,62.22222222222222
POL,29.92957746478873,62.04527162977868
LVA,29.7218045112782,63.29323308270677
BRB,29.63157894736842,61.21052631578947
ZMB,30.0,60.0
BRA,30.09396551724138,61.12672413793104
MOZ,29.8,67.8
JOR,31.142857142857142,66.42857142857143
CUB,30.875,56.25
FRA,29.97747055811572,61.053763440860216


#### Visualizing datasets

**Finally, the fun bit!**

Data without visualization without a narrative arc, to infer insights or to see a trend, is useless. We always desire to make sense of the results.

By saving our Dataset, as a temporary table, I can issue complex SQL queries against it and visualize the results, using notebook's myriad plotting options.

In [0]:
df.createOrReplaceTempView("iot_device_data")

Count all devices for a partiular country and map them

In [0]:
%sql select cca3, count(distinct device_id) as device_id from iot_device_data group by cca3 order by device_id desc limit 100

cca3,device_id
USA,70405
CHN,14455
JPN,12100
KOR,11879
DEU,7942
GBR,6486
CAN,6041
RUS,5989
FRA,5305
BRA,3224


Let's visualize the results as a pie chart and distribution for devices in the country where C02 are high.

In [0]:
%sql select cca3, c02_level from iot_device_data where c02_level > 1400 order by c02_level desc

cca3,c02_level
MEX,1599
CHN,1599
CZE,1599
TWN,1599
USA,1599
PHL,1599
POL,1599
USA,1599
SWE,1599
SWE,1599


Select all countries' devices with high-levels of C02 and group by cca3 and order by device_ids

In [0]:
%sql select cca3, count(distinct device_id) as device_id from iot_device_data where lcd == 'red' group by cca3 order by device_id desc limit 100

cca3,device_id
USA,17489
CHN,3616
KOR,2942
JPN,2935
DEU,1966
GBR,1660
CAN,1564
RUS,1508
FRA,1353
BRA,856


find out all devices in countries whose batteries need replacements

In [0]:
%sql select cca3, count(distinct device_id) as device_id from iot_device_data where battery_level == 0 group by cca3 order by device_id desc limit 100

cca3,device_id
USA,7043
CHN,1415
KOR,1217
JPN,1210
DEU,760
GBR,650
CAN,612
RUS,600
FRA,582
BRA,374


Converting a Dataset to RDDs.

In [0]:
deviceEvents = df.select("device_name","cca3","c02_level").where(col("c02_level") > 1300)

eventsRDD = deviceEvents.take(10)


In [0]:
display(deviceEvents)

device_name,cca3,c02_level
sensor-pad-2n2Pea,NOR,1473
device-mac-36TWSKiT,ITA,1556
sensor-pad-8xUD6pzsQI,JPN,1536
sensor-pad-10BsywSYUF,USA,1470
meter-gauge-11dlMTZty,ITA,1544
sensor-pad-14QL93sBR0j,NOR,1346
sensor-pad-16aXmIJZtdO,USA,1425
meter-gauge-17zb8Fghhl,USA,1466
meter-gauge-19eg1BpfCO,USA,1531
sensor-pad-22oWV2D,JPN,1522


#### 1.	Explain the main differences between RDDs, Dataframes and Datasets (4 marks)

There are scenarios in which RDD is used other scenarios where Data frame and Datasets are used.

#### Resilient Distributed Dataset(RDD) is low level API

1) Distributed data Abstraction
2) Resilient & Immutable
Ability to recreate RDD at any point and time during its execution of the cycle. If we go from one RDD to the next one to the third one, it get recorded and we have the ability to recreate, which means the original RDD remains unaltered. 
3) Compile-time Type-Safe 
4) Data could be Unstructured or Structure
5) Lazy Manner 
6) You don't care about imposing a schema

RDD offer control and flexibility. They provide low-level API. RDD is used when dealing with a lot of unstructured data (media streams or texts). If we don't care about schema or structure of data.

#### Data Frames & Datasets 

These are High-level APIs and Domain Specific Language (DSL). When we care about Strong Type-safety then use Data Frames & Datasets. They are easy to use and readability. 
when we care about 1) Structured Data Schema 2) Code optimization and performance 3) Space efficiency

They automatically find out the schema. Dataset is faster than RDDs but bit slower that Dataframes. Dataframes provide an easy API to perform aggregation operations.

In [0]:
display(df.head(1000))


battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


In [0]:
display(df.collect()[100])

Row(battery_level=4, c02_level=854, cca2='US', cca3='USA', cn='United States', device_id=101, device_name='meter-gauge-101LT6cP', humidity=91, ip='66.83.105.10', latitude=36.07, lcd='green', longitude=-79.79, scale='Celsius', temp=11, timestamp=1458444054182)

#### 2.1 How many sensor pads are reported to be from Poland (2 marks)

In [0]:
Poland = df.select("*").where(col("cn") == 'Poland')

PolandRDD = Poland.take(10)

In [0]:
display(Poland)

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
7,1036,PL,POL,Poland,170,sensor-pad-1703fywiW,63,212.87.11.82,52.25,yellow,21.0,Celsius,32,1458444054223
0,1445,PL,POL,Poland,219,device-mac-219BL7ifYGb0,63,62.21.99.202,52.23,red,21.02,Celsius,10,1458444054246
0,1572,PL,POL,Poland,378,sensor-pad-378SgqAQkyHZG,71,83.238.122.21,52.23,red,21.02,Celsius,15,1458444054306
3,1067,PL,POL,Poland,566,sensor-pad-566TPTBfbc08,31,149.156.8.73,50.08,yellow,19.92,Celsius,20,1458444054360
1,1387,PL,POL,Poland,603,device-mac-603raWbHGSEa,67,193.111.38.102,52.23,yellow,21.02,Celsius,33,1458444054369
8,1556,PL,POL,Poland,790,sensor-pad-790HqVUEJ7aUj,61,79.139.16.1,50.26,red,19.03,Celsius,24,1458444054409
2,835,PL,POL,Poland,794,sensor-pad-7944KFOZraAwh,66,83.142.138.254,52.23,green,21.02,Celsius,31,1458444054409
2,976,PL,POL,Poland,825,device-mac-825VceMEqYoK,72,83.151.32.114,51.1,green,17.03,Celsius,10,1458444054415
7,1021,PL,POL,Poland,877,meter-gauge-877qysuK12sAo,83,212.85.100.128,52.23,yellow,21.02,Celsius,14,1458444054424
4,975,PL,POL,Poland,892,sensor-pad-892TMLKM,45,86.111.204.1,52.23,green,21.02,Celsius,24,1458444054427


In [0]:
display(Poland.count())

2744

#### 2.2 How many different LCDs are present in the dataset (2 marks)

In [0]:
%sql select count(distinct lcd) as lcd from iot_device_data 
-- where battery_level == 0 group by cca3 order by device_id desc limit 100

lcd
3


In [0]:
display(df.head(10))

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


In [0]:
display(df.count())

198164

In [0]:
# mac_machine = df.filter(df("device_name").equalTo("meter-gauge-1xbYRYcj")).show()
# mac_machine = df.filter(col("device_name").equalTo("meter-gauge-1xbYRYcj")).show())

dfAvgTmp = df.filter(col("device_name").like("%mac%"))
display(dfAvgTmp)

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
9,1259,US,USA,United States,15,device-mac-15se6mZ,70,67.185.72.1,47.41,yellow,-122.0,Celsius,13,1458444054128
5,939,AT,AUT,Austria,21,device-mac-21sjz5h,44,193.200.142.254,48.2,green,16.37,Celsius,30,1458444054131
5,1597,KR,KOR,Republic of Korea,27,device-mac-27P5wf2,73,218.239.168.1,37.57,red,126.98,Celsius,10,1458444054135
3,835,RU,RUS,Russia,33,device-mac-33B94GfPzi,30,178.23.147.134,55.75,green,37.62,Celsius,15,1458444054139
2,908,CN,CHN,China,39,device-mac-39iklYVtvBT,84,218.7.15.1,45.75,green,126.65,Celsius,17,1458444054144
4,985,IT,ITA,Italy,45,device-mac-45fN2COZF4cM,74,213.140.20.230,42.83,green,12.83,Celsius,26,1458444054149
5,921,GB,GBR,United Kingdom,51,device-mac-51iy02vXU,64,146.97.40.113,51.5,green,-0.13,Celsius,13,1458444054153
6,1503,US,USA,United States,57,device-mac-57tRB0uMlvSb,40,64.246.146.13,42.66,red,-73.75,Celsius,22,1458444054158


#### 2.3 Find 5 countries that have the largest number of MAC devices used (2 marks)

In [0]:
Mac = df.where(df.device_name.like('%mac%')).groupBy('cn').count()
display(Mac.sort(col("count").desc()).limit(5))

cn,count
United States,11509
China,2300
Japan,2002
Republic of Korea,1999
Germany,1314


#### 2.4 Propose and try an interesting statistical test or machine learning model you could use to gain insight from this dataset (2 marks)

I will be predicting the behavior of 'c02_level' and keeping as the label. I am trying to find what are the feature impacting the c02_level. Most of the feature's datatype is Long and String. First, I will be using only 3 features to see the results and then I will use hot encoding technique on the categorical features.

In [0]:
display(df.head(50))

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


In [0]:
df.cache()
df.printSchema()

root
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [0]:
from pyspark.sql.types import IntegerType
df = df.withColumn("battery_level", df["battery_level"].cast(IntegerType()))
df = df.withColumn("c02_level", df["c02_level"].cast(IntegerType()))
df = df.withColumn("temp", df["temp"].cast(IntegerType()))
df = df.withColumn("humidity", df["humidity"].cast(IntegerType()))

In [0]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
battery_level,198164,4.4997678690377665,2.8733916884106137,0,9
c02_level,198164,1199.7639429967098,231.0600256290077,800,1599
cca2,198164,,,AD,ZW
cca3,198164,,,ABW,ZWE
cn,198164,,,,Åland
device_id,198164,99082.5,57205.1637092317,1,198164
device_name,198164,,,device-mac-100005kiWNTBu,therm-stick-99995kaQv94y
humidity,198164,61.99212773258513,21.672313062313982,25,99
ip,198164,,,108.57.128.215,99.64.14.90


In [0]:
df.show()

+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   42.83|   red|    12.83|Celsius|  19|1458444054120|
|            6|     1080|  US| USA|United Stat

In [0]:
splits = df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
df.printSchema()

root
 |-- battery_level: integer (nullable = true)
 |-- c02_level: integer (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: integer (nullable = true)
 |-- timestamp: long (nullable = true)



In [0]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
battery_level,8,7,2,6,4
c02_level,868,1473,1556,1080,931
cca2,US,NO,IT,US,PH
cca3,USA,NOR,ITA,USA,PHL
cn,United States,Norway,Italy,United States,Philippines
device_id,1,2,3,4,5
device_name,meter-gauge-1xbYRYcj,sensor-pad-2n2Pea,device-mac-36TWSKiT,sensor-pad-4mzWkz,therm-stick-5gimpUrBB
humidity,51,70,44,32,62
ip,68.161.225.1,213.161.254.1,88.36.5.1,66.39.173.154,203.82.41.9
latitude,38.0,62.47,42.83,44.06,14.58


In [0]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
battery_level,198164,4.4997678690377665,2.8733916884106137,0,9
c02_level,198164,1199.7639429967098,231.0600256290077,800,1599
humidity,198164,61.99212773258513,21.672313062313982,25,99
temp,198164,22.012787388223895,7.209848253887109,10,34


In [0]:
df_features = df.select('battery_level', 'humidity', 'temp','c02_level')
cols = df_features.columns
df_features.printSchema()

root
 |-- battery_level: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- temp: integer (nullable = true)
 |-- c02_level: integer (nullable = true)



In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['battery_level', 'humidity', 'temp','c02_level'], outputCol = 'features')
IOT_df = vectorAssembler.transform(df_features)
IOT_df = IOT_df.select(['features', 'c02_level'])
IOT_df.show(3)

+--------------------+---------+
|            features|c02_level|
+--------------------+---------+
|[8.0,51.0,34.0,86...|      868|
|[7.0,70.0,11.0,14...|     1473|
|[2.0,44.0,19.0,15...|     1556|
+--------------------+---------+
only showing top 3 rows



In [0]:
splits = IOT_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='c02_level', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0,0.9987019290500025]
Intercept: 1.5573932115228986


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.299922
r2: 0.999998


In [0]:
train_df.describe().show()

+-------+------------------+
|summary|         c02_level|
+-------+------------------+
|  count|            138314|
|   mean|1199.7751059184175|
| stddev|231.05303853234736|
|    min|               800|
|    max|              1599|
+-------+------------------+



In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","c02_level","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="c02_level",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+---------+--------------------+
|        prediction|c02_level|            features|
+------------------+---------+--------------------+
|1337.8205742804262|     1338|[0.0,25.0,11.0,13...|
|1489.6232674960265|     1490|[0.0,25.0,12.0,14...|
|1360.7907186485763|     1361|[0.0,25.0,13.0,13...|
|1366.7829302228763|     1367|[0.0,25.0,13.0,13...|
|1083.1515823726756|     1083|[0.0,25.0,15.0,10...|
+------------------+---------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.999998


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.299953


In [0]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","c02_level","features").show()

+------------------+---------+--------------------+
|        prediction|c02_level|            features|
+------------------+---------+--------------------+
|1337.8205742804262|     1338|[0.0,25.0,11.0,13...|
|1489.6232674960265|     1490|[0.0,25.0,12.0,14...|
|1360.7907186485763|     1361|[0.0,25.0,13.0,13...|
|1366.7829302228763|     1367|[0.0,25.0,13.0,13...|
|1083.1515823726756|     1083|[0.0,25.0,15.0,10...|
| 868.4306676269251|      868|[0.0,25.0,19.0,86...|
|1138.0801884704258|     1138|[0.0,25.0,19.0,11...|
| 829.4812923939751|      829|[0.0,25.0,21.0,82...|
|1099.1308132374756|     1099|[0.0,25.0,21.0,10...|
|1496.6141809993767|     1497|[0.0,25.0,29.0,14...|
|1504.6037964317766|     1505|[0.0,25.0,29.0,15...|
|1155.0581212642758|     1155|[0.0,25.0,31.0,11...|
| 910.3761486470253|      910|[0.0,25.0,34.0,91...|
|1156.0568231933257|     1156|[0.0,25.0,34.0,11...|
|1162.0490347676257|     1162|[0.0,26.0,11.0,11...|
|1018.2359569844256|     1018|[0.0,26.0,12.0,10...|
|1516.588219

In [0]:
# Decision tree regression

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'c02_level')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="c02_level", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 7.34512


In [0]:
dt_model.featureImportances

Out[704]: SparseVector(4, {3: 1.0})

In [0]:
IOT_df.take(1)

Out[705]: [Row(features=DenseVector([8.0, 51.0, 34.0, 868.0]), c02_level=868)]

In [0]:
# Gradient-boosted tree regression

In [0]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'c02_level', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'c02_level', 'features').show(5)

+------------------+---------+--------------------+
|        prediction|c02_level|            features|
+------------------+---------+--------------------+
|1333.0331695766927|     1338|[0.0,25.0,11.0,13...|
|1485.9938754699408|     1490|[0.0,25.0,12.0,14...|
|1358.6226957724027|     1361|[0.0,25.0,13.0,13...|
|1358.6226957724027|     1367|[0.0,25.0,13.0,13...|
| 1079.492145144914|     1083|[0.0,25.0,15.0,10...|
+------------------+---------+--------------------+
only showing top 5 rows



In [0]:
gbt_evaluator = RegressionEvaluator(
    labelCol="c02_level", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 7.28565
