Objective: Analyzing IoT Data with Spark Sql
The objective of this case study is to analyze sensor data, which is presented in JSON format, using Spark SQL.
Dataset (iot_devices.json): The dataset has the following attributes
Device ID
Device Name
IP Address
Cca2 – country code
Cca3 – country name
Cn – Full name of country
Latitude
Longitude
Scale
Temperature
Humidity
Battery Level
CO2 level
LCD Status
Timestamp

1. Read the data into a Dataframe.

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# Step a: Read the data into a DataFrame
spark = SparkSession.builder.appName("IoTAnalysis").getOrCreate()
df = spark.read.json("iot_devices.json")
df

DataFrame[battery_level: bigint, c02_level: bigint, cca2: string, cca3: string, cn: string, device_id: bigint, device_name: string, humidity: bigint, ip: string, latitude: double, lcd: string, longitude: double, scale: string, temp: bigint, timestamp: bigint]

2. Convert the Dataframe into a temporary view called iot.

In [41]:
df.createOrReplaceTempView("iot")
df

DataFrame[battery_level: bigint, c02_level: bigint, cca2: string, cca3: string, cn: string, device_id: bigint, device_name: string, humidity: bigint, ip: string, latitude: double, lcd: string, longitude: double, scale: string, temp: bigint, timestamp: bigint]

3. Count how many devices are there from each country and display the output.

In [64]:
# Step c: Count how many devices are there from each country and display the output
count_by_country = spark.sql("""
    SELECT cca3 as CountryCode, COUNT(*) as DeviceCount
    FROM iot
    GROUP BY cca3
""")
count_by_country.show()

+-----------+-----------+
|CountryCode|DeviceCount|
+-----------+-----------+
|        HTI|         12|
|        PSE|         25|
|        POL|       2744|
|        LVA|        358|
|        BRB|         38|
|        JAM|         44|
|        BRA|       3224|
|        ARM|         34|
|        MOZ|         22|
|        JOR|         46|
|        CUB|         15|
|        FRA|       5305|
|        ABW|          8|
|        TCA|          1|
|        BRN|         19|
|        BOL|        108|
|        URY|        117|
|        LBY|         16|
|        ATG|         60|
|        ITA|       2915|
+-----------+-----------+
only showing top 20 rows



4. Display all the countries whose carbon dioxide level is more than 1400. Sort the output in descending order.

In [56]:
# Step d: Display all the countries whose carbon dioxide level is more than 1400. Sort the output in descending order
spark.sql("SELECT DISTINCT cca3,c02_level from iot where c02_level>1400 ORDER BY c02_level DESC").show()

+----+---------+
|cca3|c02_level|
+----+---------+
| PHL|     1599|
| HKG|     1599|
| CZE|     1599|
| BRA|     1599|
| IND|     1599|
| KOR|     1599|
| ESP|     1599|
| GBR|     1599|
| KAZ|     1599|
| FRA|     1599|
| MYS|     1599|
| POL|     1599|
| HUN|     1599|
| CAN|     1599|
| ROU|     1599|
| CHN|     1599|
| RUS|     1599|
| MEX|     1599|
| VNM|     1599|
| ARE|     1599|
+----+---------+
only showing top 20 rows



5. Select all countries' devices with high-levels of C02 and group by cca3 and order by device_ids (Hint: For high CO2 level, the LCD status will be RED).

In [62]:
#5. Select all countries' devices with high-levels of C02 and group by cca3 and order by device_ids (Hint: For high CO2 level, the LCD status will be RED).
high_co2_devices = spark.sql("""
    SELECT cca3 as CountryCode, device_id
    FROM iot
    WHERE c02_level > 1400 AND lcd = 'red'
    ORDER BY 'DeviceID'
""")
high_co2_devices.show()

+-----------+---------+
|CountryCode|device_id|
+-----------+---------+
|        NOR|        2|
|        ITA|        3|
|        JPN|        8|
|        USA|       10|
|        ITA|       11|
|        USA|       16|
|        USA|       17|
|        USA|       19|
|        JPN|       22|
|        CAN|       24|
|        KOR|       27|
|        KOR|       28|
|        UKR|       47|
|        SWE|       53|
|        USA|       54|
|        USA|       57|
|        USA|       64|
|        CZE|       66|
|        IND|       77|
|        KOR|       78|
+-----------+---------+
only showing top 20 rows



6. Find out all devices in countries whose batteries need replacements.

In [60]:
# Step f: Find out all devices in countries whose batteries need replacements
devices_with_low_battery = spark.sql("""
    SELECT *
    FROM iot
    WHERE battery_level < 20
""")
devices_with_low_battery.show()

+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   42.83|   red|    12.83|Celsius|  19|1458444054120|
|            6|     1080|  US| USA|United Stat