In [4]:
import findspark
findspark.init ("/opt/manual/spark/")

from pyspark.sql import SparkSession, functions as F
import pandas as pd
import warnings
from pyspark.sql.functions import input_file_name, to_timestamp

In [5]:
warnings.simplefilter(action='ignore')
#display
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)

In [6]:
spark= SparkSession.builder \
    .appName("final_project") \
    .master("local[2]") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

2023-07-04 21:14:00,149 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
df = spark.read.format("csv") \
    .option("header",False) \
    .option("inferSchema",True) \
    .option("sep",",") \
    .load("file:///home/train/datasets/KETI/*/*.csv") \
    .withColumn("file_name", F.input_file_name()) \
    .withColumn("timestamp",F.to_timestamp("_c0")) \
    .withColumn("room",F.element_at(F.reverse(F.split(F.col("file_name"),"/")),2)) \
    .withColumn("sensor",F.regexp_replace(F.element_at(F.reverse(F.split(F.col("file_name"),"/")),1),".csv","")) \
    .withColumnRenamed("_c0","ts_min_bignt") \
    .withColumnRenamed("_c1","value") \
    .drop("file_name") \
    .select("timestamp","ts_min_bignt","value","room","sensor")
    

                                                                                

In [8]:
df_pir = df.filter("sensor == 'pir'") \
    .withColumn("pir",F.col("value")) \
    .withColumn("room_pir",F.col("room")) \
    .withColumn("time_pir",F.col("timestamp")) \
    .withColumn("bignt_pir",F.col("ts_min_bignt")) \
    .drop("sensor","value","room","timestamp","ts_min_bignt")


In [9]:
df_light = df.filter("sensor == 'light'") \
    .withColumn("light",F.col("value")) \
    .withColumn("room_light",F.col("room")) \
    .withColumn("time_light",F.col("timestamp")) \
    .withColumn("bignt_light",F.col("ts_min_bignt")) \
    .drop("sensor","value","room","timestamp","ts_min_bignt")

In [10]:
df_humidity = df.filter("sensor == 'humidity'") \
    .withColumn("humidity",F.col("value")) \
    .withColumn("room_humidity",F.col("room")) \
    .withColumn("time_humidity",F.col("timestamp")) \
    .withColumn("bignt_humidity",F.col("ts_min_bignt")) \
    .drop("sensor","value","room","timestamp","ts_min_bignt")

In [11]:
df_co2 = df.filter("sensor == 'co2'") \
    .withColumn("co2",F.col("value")) \
    .withColumn("room_co2",F.col("room")) \
    .withColumn("time_co2",F.col("timestamp")) \
    .withColumn("bignt_co2",F.col("ts_min_bignt")) \
    .drop("sensor","value","room","timestamp","ts_min_bignt")

In [12]:
df_temp = df.filter("sensor == 'temperature'") \
    .withColumn("temp",F.col("value")) \
    .withColumn("room_temp",F.col("room")) \
    .withColumn("time_temp",F.col("timestamp")) \
    .withColumn("bignt_temp",F.col("ts_min_bignt")) \
    .drop("sensor","value","room","timestamp","ts_min_bignt")

In [16]:
df2 = df_pir.join(df_co2,(df_pir["room_pir"] == df_co2["room_co2"]) & (df_pir["time_pir"] == df_co2["time_co2"]) & (df_pir["bignt_pir"] == df_co2["bignt_co2"]),"inner") \
                  .join(df_temp, (df_pir["room_pir"] == df_temp["room_temp"]) & (df_pir["time_pir"] == df_temp["time_temp"]) & (df_pir["bignt_pir"] == df_temp["bignt_temp"]),"inner") \
                  .join(df_light, (df_pir["room_pir"] == df_light["room_light"]) & (df_pir["time_pir"] == df_light["time_light"]) & (df_pir["bignt_pir"] == df_light["bignt_light"]),"inner") \
                  .join(df_humidity, (df_pir["room_pir"] == df_humidity["room_humidity"]) & (df_pir["time_pir"] == df_humidity["time_humidity"]) & (df_pir["bignt_pir"] == df_humidity["bignt_humidity"]),"inner") 

In [17]:
df3 = df2 \
    .withColumn("event_ts_min",F.col("time_pir")) \
    .withColumn("room",F.col("room_pir")) \
    .withColumn("ts_min_bignt",F.col("bignt_pir")) \
    .drop("value","room_co2","time_co2","room_temp","time_temp","room_light","time_light","room_humidity",
          "time_humidity","time_pir","room,pir","bignt_pir","bignt_light","bignt_humidity","bignt_temp") \
    .select("event_ts_min","ts_min_bignt","room","co2","light","temp","humidity","pir")

In [18]:
df3.show(4)

[Stage 9:>                                                          (0 + 1) / 1]

+-------------------+------------+----+-----+-----+-----+--------+----+
|       event_ts_min|ts_min_bignt|room|  co2|light| temp|humidity| pir|
+-------------------+------------+----+-----+-----+-----+--------+----+
|2013-08-27 11:01:03|  1377590463| 413|451.0|  4.0|24.12|   45.28| 0.0|
|2013-08-26 13:12:43|  1377511963| 415|473.0| 17.0|23.37|   55.23| 0.0|
|2013-08-26 20:21:35|  1377537695| 415|472.0| 91.0| 23.1|   55.42| 0.0|
|2013-08-26 23:29:36|  1377548976| 415|867.0| 51.0|24.04|   55.14|25.0|
+-------------------+------------+----+-----+-----+-----+--------+----+
only showing top 4 rows



                                                                                

In [19]:
df4 = df3.orderBy("event_ts_min")

In [20]:
df5=df4.dropna()

In [21]:
df5.show(4)



+-------------------+------------+----+-----+-----+-----+--------+----+
|       event_ts_min|ts_min_bignt|room|  co2|light| temp|humidity| pir|
+-------------------+------------+----+-----+-----+-----+--------+----+
|2013-08-24 02:04:53|  1377299093| 511|387.0|252.0|22.62|   52.75| 0.0|
|2013-08-24 02:04:57|  1377299097| 644|465.0|165.0| 22.8|    52.4| 0.0|
|2013-08-24 02:04:57|  1377299097| 648|175.0|191.0|23.32|   50.32| 0.0|
|2013-08-24 02:04:57|  1377299097|656A|579.0|176.0|24.37|    49.9|30.0|
+-------------------+------------+----+-----+-----+-----+--------+----+
only showing top 4 rows



                                                                                

In [22]:
df5.coalesce(1).write \
    .format("csv") \
    .mode("overwrite") \
    .option("header","true") \
    .save("file:///home/train/datasets/final/dffinal")

                                                                                