In [2]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DataType, ArrayType, FloatType, LongType, TimestampType

In [3]:
spark = SparkSession.builder \
.master("local[2]") \
.appName("Define Custom Schema") \
.getOrCreate()

2023-04-20 12:16:17,111 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [1]:
!wget -P /home/train/datasets/ \
https://github.com/erkansirin78/datasets/raw/master/sensors_instrumented_in_an_office_building_dataset.zip

--2023-04-20 11:42:07--  https://github.com/erkansirin78/datasets/raw/master/sensors_instrumented_in_an_office_building_dataset.zip
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/erkansirin78/datasets/master/sensors_instrumented_in_an_office_building_dataset.zip [following]
--2023-04-20 11:42:07--  https://raw.githubusercontent.com/erkansirin78/datasets/master/sensors_instrumented_in_an_office_building_dataset.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 78299066 (75M) [application/zip]
Saving to: ‘/home/train/datasets/sensors_instrumented_in_an_office_building_dataset.zip’


2023-04-20 

In [4]:
programmatical_schema = StructType([
    StructField("ts_min_bignt",LongType(),True),
    StructField("value",FloatType(),True),
    StructField("file_name",StringType(),True),
    StructField("room",StringType(),True),
    StructField("file_path",StringType(),True),
    StructField("file_name_type",StringType(),True),
    StructField("event_ts_min",TimestampType(),True)
])    

In [5]:
df = spark.read \
.option("sep",",") \
.schema(programmatical_schema) \
.csv("file:///home/train/datasets/KETI/*") \
.withColumn("file_path",F.input_file_name()) \
.withColumn("room",F.element_at(F.reverse(F.split(F.col("file_path"),"/")),2)) \
.withColumn("file_name",F.element_at(F.reverse(F.split(F.col("file_path"),"/")),1)) \
.withColumn('file_name_type', F.split(F.col("file_name"), "\\.").getItem(0)) \
.filter("file_name_type not in ('README')") \
.withColumn("event_ts_min",F.from_unixtime(F.col("ts_min_bignt")))

                                                                                

In [6]:
df.limit(10).toPandas()

Unnamed: 0,ts_min_bignt,value,file_name,room,file_path,file_name_type,event_ts_min
0,1377299103,2287.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:03
1,1377299105,1977.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:05
2,1377299126,2208.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:26
3,1377299130,2267.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:30
4,1377299133,2097.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:33
5,1377299138,2111.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:38
6,1377299143,2257.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:43
7,1377299148,2200.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:48
8,1377299153,2164.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:53
9,1377299158,2183.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-08-24 02:05:58


In [7]:
df.count()

                                                                                

29882394

In [8]:
df.selectExpr("MAX(event_ts_min)","MIN(event_ts_min)").limit(5).toPandas()

                                                                                

Unnamed: 0,max(event_ts_min),min(event_ts_min)
0,2013-09-01 09:58:59,2013-08-23 18:00:00


In [9]:
df_test1 = df.where("event_ts_min between to_date('2013-09-01','yyyy-MM-dd') and to_date('2013-09-02','yyyy-MM-dd')")

In [10]:
df_test1.limit(5).toPandas()

                                                                                

Unnamed: 0,ts_min_bignt,value,file_name,room,file_path,file_name_type,event_ts_min
0,1378001479,1950.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-09-01 05:11:19
1,1378001504,2149.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-09-01 05:11:44
2,1378001589,2141.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-09-01 05:13:09
3,1378001724,2300.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-09-01 05:15:24
4,1378001769,2050.0,light.csv,668,file:/home/train/datasets/KETI/668/light.csv,light,2013-09-01 05:16:09


In [18]:
df2 = df.groupBy("file_name_type","room",F.window("event_ts_min","1 minutes")) \
.agg(F.avg("value").alias("value")) \
.select(F.to_timestamp(F.col("window.start"), 'MM/dd/yyyy HH:mm:ss a').alias("event_ts_min"),F.unix_timestamp(F.col("window.start"), 'MM/dd/yyyy hh:mm:ss a').alias("ts_min_bignt"),F.col("file_name_type"),F.col("room"),F.col("value"))

In [19]:
df2.limit(10).toPandas()

                                                                                

Unnamed: 0,event_ts_min,ts_min_bignt,file_name_type,room,value
0,2013-08-24 07:23:00,1377318180,light,668,2137.916667
1,2013-08-24 07:28:00,1377318480,light,668,2212.416667
2,2013-08-24 08:17:00,1377321420,light,668,2135.333333
3,2013-08-24 12:11:00,1377335460,light,668,2172.25
4,2013-08-24 12:45:00,1377337500,light,668,2228.166667
5,2013-08-24 15:34:00,1377347640,light,668,2186.083333
6,2013-08-24 16:23:00,1377350580,light,668,2142.916667
7,2013-08-24 17:42:00,1377355320,light,668,2224.75
8,2013-08-24 19:44:00,1377362640,light,668,2185.916667
9,2013-08-24 20:57:00,1377367020,light,668,2281.5


In [20]:
df3 = df2.groupBy('event_ts_min','ts_min_bignt','room').pivot("file_name_type").mean("value")

                                                                                

In [21]:
df3.limit(20).toPandas()

                                                                                

Unnamed: 0,event_ts_min,ts_min_bignt,room,co2,humidity,light,pir,temperature
0,2013-08-26 12:32:00,1377509520,562,238.083333,61.282499,2.416667,0.0,21.9125
1,2013-08-30 12:22:00,1377854520,776,518.25,58.419999,4.4,0.0,23.419
2,2013-08-31 01:12:00,1377900720,722,437.25,55.070001,4.916667,0.0,23.231666
3,2013-08-28 08:40:00,1377668400,415,484.25,57.557501,3.75,0.0,23.2375
4,2013-08-28 08:34:00,1377668040,746,514.833333,60.747501,5.0,0.0,22.293333
5,2013-08-26 03:10:00,1377475800,562,227.583333,63.6,3.416667,0.0,22.054166
6,2013-08-30 20:29:00,1377883740,717,444.0,61.295,124.0,0.0,22.504167
7,2013-08-26 15:43:00,1377520980,734,291.25,54.875,5.583333,0.0,23.279167
8,2013-08-26 14:18:00,1377515880,562,232.0,60.862501,2.416667,0.0,21.916667
9,2013-08-28 10:53:00,1377676380,448,362.5,57.805001,4.416667,0.0,23.288334


In [22]:
df3.count()

                                                                                

602667

In [23]:
df3.where("co2 is null").show()



+-------------------+------------+----+----+-----------------+------------------+-------+------------------+
|       event_ts_min|ts_min_bignt|room| co2|         humidity|             light|    pir|       temperature|
+-------------------+------------+----+----+-----------------+------------------+-------+------------------+
|2013-08-28 01:31:00|  1377642660| 417|null|             null|              null|    0.0|              null|
|2013-09-01 08:58:00|  1378015080| 562|null|             null|              null|    0.0|              null|
|2013-09-01 05:44:00|  1378003440| 648|null|             null|              null|    0.0|              null|
|2013-09-01 04:25:00|  1377998700| 721|null|             null|              null|    0.0|              null|
|2013-09-01 03:13:00|  1377994380| 664|null|             null|              null|    0.0|              null|
|2013-09-01 01:50:00|  1377989400| 454|null|             null|              null|    0.0|              null|
|2013-08-31 22:35:0

                                                                                

In [25]:
df_test2 = df3.where("ts_min_bignt == '1377299100' and room =='413' ")

In [26]:
df_test2.limit(5).toPandas()

                                                                                

Unnamed: 0,event_ts_min,ts_min_bignt,room,co2,humidity,light,pir,temperature
0,2013-08-24 02:05:00,1377299100,413,494.727273,45.330001,96.555556,0.0,23.927778


In [27]:
df3.printSchema()

root
 |-- event_ts_min: timestamp (nullable = true)
 |-- ts_min_bignt: long (nullable = true)
 |-- room: string (nullable = true)
 |-- co2: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- light: double (nullable = true)
 |-- pir: double (nullable = true)
 |-- temperature: double (nullable = true)



In [29]:
df4 = df3.select(F.col("event_ts_min"),
                 F.col("ts_min_bignt"),
                 F.col("room"),
                 F.coalesce(F.col("co2"),F.lit(0)).alias("co2"),
                 F.coalesce(F.col("light"),F.lit(0)).alias("light"),
                 F.coalesce(F.col("temperature"),F.lit(0)).alias("temp"),
                 F.coalesce(F.col("humidity"),F.lit(0)).alias("humidity"),
                 F.coalesce(F.col("pir"),F.lit(0)).alias("pir")
                )

In [30]:
df4.printSchema()

root
 |-- event_ts_min: timestamp (nullable = true)
 |-- ts_min_bignt: long (nullable = true)
 |-- room: string (nullable = true)
 |-- co2: double (nullable = false)
 |-- light: double (nullable = false)
 |-- temp: double (nullable = false)
 |-- humidity: double (nullable = false)
 |-- pir: double (nullable = false)



In [31]:
df4.count()

                                                                                

602667

In [32]:
df4.selectExpr("MAX(event_ts_min)","MIN(event_ts_min)").limit(5).toPandas()

                                                                                

Unnamed: 0,max(event_ts_min),min(event_ts_min)
0,2013-09-01 09:58:00,2013-08-23 18:00:00


In [33]:
df_test3 = df4.where("ts_min_bignt == '1377299100' and room == '413' ")

In [34]:
df_test3.limit(5).toPandas()

                                                                                

Unnamed: 0,event_ts_min,ts_min_bignt,room,co2,light,temp,humidity,pir
0,2013-08-24 02:05:00,1377299100,413,494.727273,96.555556,23.927778,45.330001,0.0


In [None]:
df4.repartition(1).write.csv("file:///home/train/project_sensors_output/",header = True)

