<a href="https://colab.research.google.com/github/tigureis/PySpark_for_Sensor_Data_Transformation_and_Storage/blob/main/PySpark_for_Sensor_Data_Transformation_and_Storage.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install and import PySpark

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

Initiates a SparkSession

In [3]:
spark=SparkSession.builder.appName('sojaSensor').getOrCreate()

Load the data from a CSV file into a SparkDF

In [4]:
df=spark.read.format('csv').option('header',True).load('/content/sensores-iot.csv')

Check the DF

In [5]:
df.show(20)

+---+-----------+-----------+--------+--------------------+-----------+-----------+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|
|  5|sensor-2821|         15|      52|2000-10-27 23:32:...|-55.3025525| -26.657210|
|  6|sensor-9805|         12|      37|2022-08-09 22:08:...| -25.912786| 122.693490|
|  7|sensor-1230|         10|      60|1970-02-02 17:32:...|-88.9300035|  33.377804|
|  8|sensor-4472|         11|      75|2014-02-22 06:42:...|-89.0426855| 120.

In [6]:
df

DataFrame[_c0: string, device_id: string, temperature: string, humidity: string, timestamp: string, latitude: string, longitude: string]

In [7]:
df.count()

1000000

Create a column with the date, extracted from the *timestamp* column

In [8]:
df=df.withColumn('date',F.to_date(F.col('timestamp')))

Extract the date and separate in 3 distinct columns (yyyy;mm;dd)

In [9]:
df=(df.withColumn('year', F.year('date'))
  .withColumn('month', F.month('date'))
  .withColumn('day', F.dayofmonth('date')))
df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi

Calculate and store the average of temperature and humidity

In [10]:
mean_temp=df.select(F.mean(F.col('temperature'))).collect()[0][0]
mean_humidity=df.select(F.mean(F.col('humidity'))).collect()[0][0]

Use the calculated variables to fill the missing data

In [11]:
df.fillna({'temperature':mean_temp, 'humidity':mean_humidity}).show()

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
|  5|sensor-2821|         15|      52|2000-10-27 23:32:...|-55.3025525| -26.657210|2000-10-27|2000|   10| 27|
|  6|senso

Replace *temperature* outliers with the *mean temperature*

In [12]:
df.withColumn('temperature',
              F.when(F.col('temperature').between(0,40),
                     F.col('temperature')).
              otherwise(F.lit(mean_temp)))

DataFrame[_c0: string, device_id: string, temperature: string, humidity: string, timestamp: string, latitude: string, longitude: string, date: date, year: int, month: int, day: int]

# Save the DF into a [Parquet](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)

In [13]:
df.write.parquet('parquet_tradicional_sensores.parquet')

Comprees using `snappy `to reduce the file size with a faster read/write operations

In [14]:
df.write.option('compression','snappy').parquet('parquet_snappy_sensores.parquet')

Comprees using `gzip` to get a smaller file size with a slight slower read/write operations

In [15]:
df.write.option('compression','gzip').parquet('parquet_gzip_sensores.parquet')

Use `partitionBy` organizes the DF into separate folders based on `devicer_id`, `temperature` and `humidity`. Compare the processing time of the operations.

In [16]:
df.write.partitionBy('device_id').parquet('parquet_device_sensores.parquet')

In [18]:
df.write.partitionBy('temperature').parquet('parquet_temperature.parquet')

In [19]:
df.write.partitionBy('humidity').parquet('humidity')

Use `read.parquet` to retrive the data from the parquet file

In [20]:
spark.read.parquet('parquet_tradicional_sensores.parquet').show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi

In [21]:
spark.read.parquet('parquet_snappy_sensores.parquet').show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi

In [22]:
spark.read.parquet('parquet_gzip_sensores.parquet').show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi