In [1]:
!pip install pyspark

Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.5


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jar", "/home/jovyan/work/postgresql-42.5.0.jar") \
    .config("spark.executor.extraClassPath", "/home/jovyan/work/postgresql-42.5.0.jar") \
    .master("local[*]") \
    .appName("PySpark_Postgres_test").getOrCreate()

# spark = SparkSession.builder.master('local[*]').\
#             appName('my-cool-app').\
#             getOrCreate()

In [3]:
# df = spark.read \
#     .format("jdbc") \
#     .options(
#     url = "jdbc:postgresql://localhost:5431/postgres",
#     dbtable = "sensor_value",
#     user = "postgres",
#     password = "mypass",
#     driver = "org.postgresql.Driver"
# ).load()

In [4]:
df = spark.read.csv("sensor_value.csv", header='true')

In [5]:
df.printSchema()

root
 |-- sensor_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- reading: string (nullable = true)



In [6]:
df.show(truncate=False)

+---------+--------------------------+-----------+-------+
|sensor_id|timestamp                 |sensor_type|reading|
+---------+--------------------------+-----------+-------+
|1        |2022-09-01 14:00:03.000000|Temperature|2000   |
|1        |2022-09-01 14:00:33.000000|Humidity   |60     |
|2        |2022-09-01 14:00:10.000000|Temperature|2500   |
|2        |2022-09-01 14:00:28.000000|Humidity   |75     |
|1        |2022-09-01 14:15:03.000000|Temperature|3500   |
|1        |2022-09-01 14:15:33.000000|Humidity   |45     |
|2        |2022-09-01 14:15:10.000000|Temperature|1000   |
|2        |2022-09-01 14:15:28.000000|Humidity   |19     |
|1        |2022-09-01 14:30:03.000000|Temperature|5500   |
|1        |2022-09-01 14:30:33.000000|Humidity   |999    |
|2        |2022-09-01 14:30:10.000000|Temperature|4500   |
|2        |2022-09-01 14:30:28.000000|Humidity   |56     |
|1        |2022-09-01 14:45:03.000000|Temperature|1500   |
|1        |2022-09-01 14:45:33.000000|Humidity   |80    

In [7]:
# Ignore Humidity value that is negative or greater than 100
df_humid = df.filter((df.sensor_type == "Humidity") & (df.reading > 0) & (df.reading < 100)).withColumn('reading', df.reading*1)
df_humid.show(truncate=False)

+---------+--------------------------+-----------+-------+
|sensor_id|timestamp                 |sensor_type|reading|
+---------+--------------------------+-----------+-------+
|1        |2022-09-01 14:00:33.000000|Humidity   |60.0   |
|2        |2022-09-01 14:00:28.000000|Humidity   |75.0   |
|1        |2022-09-01 14:15:33.000000|Humidity   |45.0   |
|2        |2022-09-01 14:15:28.000000|Humidity   |19.0   |
|2        |2022-09-01 14:30:28.000000|Humidity   |56.0   |
|1        |2022-09-01 14:45:33.000000|Humidity   |80.0   |
|1        |2022-09-01 15:00:33.000000|Humidity   |11.0   |
|2        |2022-09-01 15:00:28.000000|Humidity   |64.0   |
|1        |2022-09-01 15:15:33.000000|Humidity   |85.0   |
|2        |2022-09-01 15:15:28.000000|Humidity   |46.0   |
|2        |2022-09-01 15:30:28.000000|Humidity   |56.0   |
|1        |2022-09-01 15:45:33.000000|Humidity   |80.0   |
+---------+--------------------------+-----------+-------+



In [8]:
# Temperature on a normal scale (2000 means 20)
df_temp = df.filter(df.sensor_type == "Temperature").withColumn('reading', df.reading/100)
df_temp.show(truncate=False)

+---------+--------------------------+-----------+-------+
|sensor_id|timestamp                 |sensor_type|reading|
+---------+--------------------------+-----------+-------+
|1        |2022-09-01 14:00:03.000000|Temperature|20.0   |
|2        |2022-09-01 14:00:10.000000|Temperature|25.0   |
|1        |2022-09-01 14:15:03.000000|Temperature|35.0   |
|2        |2022-09-01 14:15:10.000000|Temperature|10.0   |
|1        |2022-09-01 14:30:03.000000|Temperature|55.0   |
|2        |2022-09-01 14:30:10.000000|Temperature|45.0   |
|1        |2022-09-01 14:45:03.000000|Temperature|15.0   |
|2        |2022-09-01 14:45:10.000000|Temperature|56.0   |
|1        |2022-09-01 15:00:03.000000|Temperature|49.0   |
|2        |2022-09-01 15:00:10.000000|Temperature|29.0   |
|1        |2022-09-01 15:15:03.000000|Temperature|11.0   |
|2        |2022-09-01 15:15:10.000000|Temperature|22.0   |
|1        |2022-09-01 15:30:03.000000|Temperature|55.0   |
|2        |2022-09-01 15:30:10.000000|Temperature|45.0  

In [9]:
# Union Temperature and Humidity dataframe
df_all = df_temp.union(df_humid).orderBy("timestamp","sensor_id")
df_all.show(truncate=False)

+---------+--------------------------+-----------+-------+
|sensor_id|timestamp                 |sensor_type|reading|
+---------+--------------------------+-----------+-------+
|1        |2022-09-01 14:00:03.000000|Temperature|20.0   |
|2        |2022-09-01 14:00:10.000000|Temperature|25.0   |
|2        |2022-09-01 14:00:28.000000|Humidity   |75.0   |
|1        |2022-09-01 14:00:33.000000|Humidity   |60.0   |
|1        |2022-09-01 14:15:03.000000|Temperature|35.0   |
|2        |2022-09-01 14:15:10.000000|Temperature|10.0   |
|2        |2022-09-01 14:15:28.000000|Humidity   |19.0   |
|1        |2022-09-01 14:15:33.000000|Humidity   |45.0   |
|1        |2022-09-01 14:30:03.000000|Temperature|55.0   |
|2        |2022-09-01 14:30:10.000000|Temperature|45.0   |
|2        |2022-09-01 14:30:28.000000|Humidity   |56.0   |
|1        |2022-09-01 14:45:03.000000|Temperature|15.0   |
|2        |2022-09-01 14:45:10.000000|Temperature|56.0   |
|1        |2022-09-01 14:45:33.000000|Humidity   |80.0  

In [10]:
# Calculate the haft - hourly median temperature
temp_median = df_temp.groupBy("sensor_id", window("timestamp", "30 minutes")).mean("reading")
temp_median = temp_median.withColumnRenamed("window","temp_time") \
    .withColumnRenamed("avg(reading)","T") \
    .withColumnRenamed("sensor_id","temp_sensor_id")
temp_median.show(truncate=False)

+--------------+------------------------------------------+----+
|temp_sensor_id|temp_time                                 |T   |
+--------------+------------------------------------------+----+
|2             |{2022-09-01 15:30:00, 2022-09-01 16:00:00}|50.5|
|1             |{2022-09-01 14:30:00, 2022-09-01 15:00:00}|35.0|
|2             |{2022-09-01 14:00:00, 2022-09-01 14:30:00}|17.5|
|2             |{2022-09-01 14:30:00, 2022-09-01 15:00:00}|50.5|
|2             |{2022-09-01 15:00:00, 2022-09-01 15:30:00}|25.5|
|1             |{2022-09-01 15:30:00, 2022-09-01 16:00:00}|35.0|
|1             |{2022-09-01 14:00:00, 2022-09-01 14:30:00}|27.5|
|1             |{2022-09-01 15:00:00, 2022-09-01 15:30:00}|30.0|
+--------------+------------------------------------------+----+



In [11]:
# Calculate the haft - hourly median humidity
humid_median = df_humid.groupBy("sensor_id", window("timestamp", "30 minutes")).mean("reading")
humid_median = humid_median.withColumnRenamed("window","humid_time") \
    .withColumnRenamed("avg(reading)","RH") \
    .withColumnRenamed("sensor_id","humid_sensor_id")
humid_median.show(truncate=False)

+---------------+------------------------------------------+----+
|humid_sensor_id|humid_time                                |RH  |
+---------------+------------------------------------------+----+
|2              |{2022-09-01 15:30:00, 2022-09-01 16:00:00}|56.0|
|1              |{2022-09-01 14:30:00, 2022-09-01 15:00:00}|80.0|
|2              |{2022-09-01 14:00:00, 2022-09-01 14:30:00}|47.0|
|2              |{2022-09-01 14:30:00, 2022-09-01 15:00:00}|56.0|
|2              |{2022-09-01 15:00:00, 2022-09-01 15:30:00}|55.0|
|1              |{2022-09-01 15:30:00, 2022-09-01 16:00:00}|80.0|
|1              |{2022-09-01 14:00:00, 2022-09-01 14:30:00}|52.5|
|1              |{2022-09-01 15:00:00, 2022-09-01 15:30:00}|48.0|
+---------------+------------------------------------------+----+



In [12]:
# Calculate the dew point

dew_df = humid_median.join(temp_median,
               (temp_median.temp_sensor_id == humid_median.humid_sensor_id) &
               ( temp_median.temp_time == humid_median.humid_time),
               "inner")
dew_df = dew_df.withColumn('dew_point', dew_df.T+ (dew_df.RH-100)/5)
dew_df = dew_df.drop("temp_sensor_id","temp_time") \
                .withColumnRenamed("humid_sensor_id","sensor_id") \
                 .withColumnRenamed("humid_time","time")
dew_df.show(truncate=False)

+---------+------------------------------------------+----+----+---------+
|sensor_id|time                                      |RH  |T   |dew_point|
+---------+------------------------------------------+----+----+---------+
|2        |{2022-09-01 15:30:00, 2022-09-01 16:00:00}|56.0|50.5|41.7     |
|1        |{2022-09-01 14:30:00, 2022-09-01 15:00:00}|80.0|35.0|31.0     |
|2        |{2022-09-01 14:00:00, 2022-09-01 14:30:00}|47.0|17.5|6.9      |
|2        |{2022-09-01 14:30:00, 2022-09-01 15:00:00}|56.0|50.5|41.7     |
|2        |{2022-09-01 15:00:00, 2022-09-01 15:30:00}|55.0|25.5|16.5     |
|1        |{2022-09-01 15:30:00, 2022-09-01 16:00:00}|80.0|35.0|31.0     |
|1        |{2022-09-01 14:00:00, 2022-09-01 14:30:00}|52.5|27.5|18.0     |
|1        |{2022-09-01 15:00:00, 2022-09-01 15:30:00}|48.0|30.0|19.6     |
+---------+------------------------------------------+----+----+---------+



In [15]:
# save to csv the dew point
dew_df.toPandas().to_csv('dew_point.csv', index=False)