Import required libraries

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
## Importing SparkSession and creating a spark object
spark = SparkSession.builder.master("local").appName("iot data").getOrCreate()
sc = spark.sparkContext

In [3]:
spark

# 1.Read json file data into a Data Frame

In [4]:
## Reading the data into DataFrame
df = spark.read.format("json").load('iot_devices.json', header=True,inferSchema=True)
df

DataFrame[battery_level: bigint, c02_level: bigint, cca2: string, cca3: string, cn: string, device_id: bigint, device_name: string, humidity: bigint, ip: string, latitude: double, lcd: string, longitude: double, scale: string, temp: bigint, timestamp: bigint]

In [5]:
# Checking Schema
df.printSchema()

root
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [6]:
# Checking Columns
df.columns
len(df.columns)

15

In [7]:
# Checking the count of records in the dataframe
df.count()

198164

In [8]:
# Printing Top 5 rows 
df.show(10)

+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   42.83|   red|    12.83|Celsius|  19|1458444054120|
|            6|     1080|  US| USA|United Stat

# 2.Convert the Dataframe into a temporary view called iot.

In [9]:
df.createOrReplaceTempView("iot")
df

DataFrame[battery_level: bigint, c02_level: bigint, cca2: string, cca3: string, cn: string, device_id: bigint, device_name: string, humidity: bigint, ip: string, latitude: double, lcd: string, longitude: double, scale: string, temp: bigint, timestamp: bigint]

# 3.Count how many devices are there from each country and display the output.

In [11]:
spark.sql('''
select cn, count(device_id) as count
from iot
group by cn
order by count desc
''').show()

+-----------------+-----+
|               cn|count|
+-----------------+-----+
|    United States|68545|
|            China|14455|
|            Japan|12100|
|Republic of Korea|11879|
|          Germany| 7942|
|   United Kingdom| 6486|
|           Canada| 6041|
|           Russia| 5989|
|           France| 5305|
|           Brazil| 3224|
|        Australia| 3119|
|            Italy| 2915|
|           Sweden| 2880|
|           Poland| 2744|
|      Netherlands| 2488|
|            Spain| 2310|
|           Taiwan| 2128|
|            India| 1867|
|                 | 1810|
|   Czech Republic| 1507|
+-----------------+-----+
only showing top 20 rows



# 4. Display all the countries whose carbon dioxide level is more than 1400. Sort the output in descending order.

In [12]:
spark.sql('''
select cn,c02_level 
from iot
where c02_level > 1400
group by cn,c02_level
order by c02_level desc
''').show()
#spark.sql("select cn from Data where c02_level > 1400").show()

+--------------------+---------+
|                  cn|c02_level|
+--------------------+---------+
|              Turkey|     1599|
|            Bulgaria|     1599|
|             Bermuda|     1599|
|      United Kingdom|     1599|
|           Australia|     1599|
|              Russia|     1599|
|           Argentina|     1599|
|Saint Kitts and N...|     1599|
|United Arab Emirates|     1599|
|        South Africa|     1599|
|                    |     1599|
|            Malaysia|     1599|
|              Mexico|     1599|
|              Poland|     1599|
|              Sweden|     1599|
|         Philippines|     1599|
|             Germany|     1599|
|             Finland|     1599|
|             Ukraine|     1599|
|              Latvia|     1599|
+--------------------+---------+
only showing top 20 rows



# 5.Select all countries' devices with high-levels of C02 and group by cca3 and order by device_ids (Hint: For high CO2 level, the LCD status will be RED).

In [13]:
spark.sql('''
select cn,cca3,device_name,device_id
from iot
where lcd = "red"
group by cn,cca3,device_name,device_id
order by device_id asc
''').show()

+-----------------+----+--------------------+---------+
|               cn|cca3|         device_name|device_id|
+-----------------+----+--------------------+---------+
|           Norway| NOR|   sensor-pad-2n2Pea|        2|
|            Italy| ITA| device-mac-36TWSKiT|        3|
|            Japan| JPN|sensor-pad-8xUD6p...|        8|
|    United States| USA|sensor-pad-10Bsyw...|       10|
|            Italy| ITA|meter-gauge-11dlM...|       11|
|    United States| USA|sensor-pad-16aXmI...|       16|
|    United States| USA|meter-gauge-17zb8...|       17|
|    United States| USA|meter-gauge-19eg1...|       19|
|            Japan| JPN|  sensor-pad-22oWV2D|       22|
|           Canada| CAN|sensor-pad-24Pytz...|       24|
|Republic of Korea| KOR|  device-mac-27P5wf2|       27|
|Republic of Korea| KOR|sensor-pad-28Tsud...|       28|
|          Ukraine| UKR|meter-gauge-47WsF9s8|       47|
|           Sweden| SWE|meter-gauge-534fD...|       53|
|    United States| USA|sensor-pad-5410CW...|   

# 6.find out all devices in countries whose batteries need replacements.

#Devices with battery level less than 5 are considered low and have to be replaced

In [14]:

spark.sql('''
select device_name,cn,battery_level
from iot
where battery_level < 5
''').show()

+--------------------+-----------------+-------------+
|         device_name|               cn|battery_level|
+--------------------+-----------------+-------------+
| device-mac-36TWSKiT|            Italy|            2|
|therm-stick-5gimp...|      Philippines|            4|
|sensor-pad-6al7RT...|    United States|            3|
|meter-gauge-7GeDoanM|            China|            3|
|sensor-pad-8xUD6p...|            Japan|            0|
| device-mac-9GcjZ2pw|            Japan|            3|
|meter-gauge-11dlM...|            Italy|            3|
|sensor-pad-12Y2kIm0o|    United States|            0|
|sensor-pad-14QL93...|           Norway|            1|
|sensor-pad-16aXmI...|    United States|            4|
|meter-gauge-17zb8...|    United States|            0|
|sensor-pad-18XULN9Xv|            China|            4|
|therm-stick-25kK6...|    United States|            4|
|sensor-pad-28Tsud...|Republic of Korea|            3|
|device-mac-33B94G...|           Russia|            3|
|sensor-pa

In [15]:
# Stopping Spark
spark.stop()