# Systems for Processing Big Data
## TP1 - Energy Meter Monitoring




The sensor data corresponds to regular readings from 11 residential energy meters. The data covers the month of February 2024.

Each data sample has the following schema:

timestamp | sensor_id | energy
----------|-------------|-----------
timestamp | string  | float

Each energy value (KWh) corresponds to the accumulated value of the meter at the time of measurement. As such,
each meter is expected to produce a monotonically increasing series of pairs of timestamp and energy consummed up to that moment.

The meters do not start at zero or at the same value.


## Questions

The following questions should be answered for the month of February and only for this month.

### For the group of sensors:

1. Compute the total energy consumed.

2. Compute the running total energy consumed so far for each day, inclusive.

Note: You can approximate the result but using the last reading of each day from each sensor.

### For each sensor, separately:

3. Compute the total energy consumed and the average energy consumption per day.

4. Compute the day of the month with minimum and maximum energy consumption.

Note: You can approximate the result but using the last reading of each day from each sensor.

### For each sensor, separately, with estimations:

**Assumptions:**

+ Readings may be missing for extended periods due to communication problems with the sensors.

+ Readings are collected do not fall precisely "on the hour". The are collected and recorded any time.

+ For more precise results, estimate the value of the meter at precise timestamp, using linear interpolation from nearest readings.

5. Compute the **estimated** value of each sensor meter for every hour and day of the month (in ascending order).

6. Compute the **estimated** running total of the energy consumed so far. The value should be updated every hour.

## Requeriments

Solve each question using Structured Spark, either Dataframes or SQL or both.

---
### Colab Setup


In [None]:
#@title Install PySpark
!pip install pyspark findspark --quiet

In [None]:
#@title Download the dataset

!wget -q -O energy-readings.csv https://raw.githubusercontent.com/smduarte/spbd-2425/refs/heads/main/docs/labs/projs/energy-readings.csv
!head -2 energy-readings.csv

date;sensor;energy
2024-02-01 00:00:00;D;2615.0


In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('energy').getOrCreate()

sc = spark.sparkContext
try :
    readings = spark.read.csv('energy-readings.csv',
                             sep =';', header=True, inferSchema=True)

    readings.printSchema()


    readings.show(5)
except Exception as err:
    print(err)

root
 |-- date: timestamp (nullable = true)
 |-- sensor: string (nullable = true)
 |-- energy: double (nullable = true)

+-------------------+------+------+
|               date|sensor|energy|
+-------------------+------+------+
|2024-02-01 00:00:00|     D|2615.0|
|2024-02-01 00:00:18|     C|1098.8|
|2024-02-01 00:00:25|     A| 650.5|
|2024-02-01 00:00:33|     J| 966.7|
|2024-02-01 00:00:42|     H|2145.4|
+-------------------+------+------+
only showing top 5 rows



## Extract February readings

Before staring any task, we decided to filter the data based on the timestamp to get all the readings from February and keep it in a separate DataFrame for the furhter use.

For most tasks this is sufficient. However for task 5 and 6 more data is needed to perform the interpolation at the edges of the month.

In [None]:
# extract readings from February
readings_february = readings.filter("date >= date '2024-02-01' AND date < date '2024-03-01'")
readings_february.show(5)

# save results
readings_february.write.option("header", True) \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("readings_february.csv")

+-------------------+------+------+
|               date|sensor|energy|
+-------------------+------+------+
|2024-02-01 00:00:00|     D|2615.0|
|2024-02-01 00:00:18|     C|1098.8|
|2024-02-01 00:00:25|     A| 650.5|
|2024-02-01 00:00:33|     J| 966.7|
|2024-02-01 00:00:42|     H|2145.4|
+-------------------+------+------+
only showing top 5 rows



## TASK 1: For the group of sensors: Compute the total energy consumed

Our approach to this task was to take the readings from February, and for each sensor calculate the amount of energy used in the month of February (using the first and last reading of each sensor). Then, having the total energy used for each sensor, we summed it and got the total energy consumption in February.

In [None]:
# calculate the total energy consumption in February
total_energy_consumption_february = readings_february.groupBy('sensor')\
                                                     .agg(round((max('energy') - min('energy')), 1).alias('energy_used'))\
                                                     .orderBy('sensor')\
                                                     .select(round(sum('energy_used'), 2).alias('total_energy_consumption_february'))

total_energy_consumption_february.show()

# save results
total_energy_consumption_february.write.option("header", True).option("delimiter", ",").mode("overwrite").csv("T1_total_energy_consumption_february.csv")

+---------------------------------+
|total_energy_consumption_february|
+---------------------------------+
|                           3106.8|
+---------------------------------+



## TASK 2: For the group of sensors: Compute the running total energy consumed so far for each day, inclusive

This task was solved in two steps.

Firstly, we extracted the initial energy that was measured on each sensor at the begining of February (taken as the first reading from that month). That marked our starting point for any further energy consumption calculations.
The obtained DataFrame is shown below:

```
+------+---------------+
|sensor|starting_energy|
+------+---------------+
|     A|          650.5|
|     B|          627.5|
|     C|         1098.8|
|     D|         2615.0|
|     E|         1874.0|
|     F|          748.0|
|     G|          833.7|
|     H|         2145.4|
|     I|          927.2|
|     J|          966.7|
|     K|          841.2|
+------+---------------+
```

After that, for each sensor and for each day, we calculated the end-of-the-day energy consumption, which was an acumulated value over all previous days of February.
Finally, for each day, we summed all the obtained values of all sensors and got the running daily energy consumption

In [None]:
# get the first reading from each sensor
starting_energy = readings_february.groupBy("sensor")\
                                   .agg(min("energy").alias("starting_energy"))\
                                   .orderBy("sensor")

# calculate daily running energy consumption in February
#         -> calculate each sensor's end-of-the-day_energy, substract the first reading and sum all sensors' results for each day
daily_running_energy_consumption_february = readings_february.join(starting_energy, "sensor", "left")\
                                                             .withColumn("day", to_date("date"))\
                                                             .groupBy("day", "sensor")\
                                                             .agg((max("energy")-first("starting_energy")).alias("end-of-the-day_energy"))\
                                                             .groupBy("day")\
                                                             .agg(round(sum("end-of-the-day_energy"), 1).alias("running_daily_energy_february"))\
                                                             .orderBy("day")

daily_running_energy_consumption_february.show(25)


# save results
daily_running_energy_consumption_february.write.option("header", True).option("delimiter", ",").mode("overwrite").csv("T2_daily_running_energy_consumption_february.csv")

+----------+-----------------------------+
|       day|running_daily_energy_february|
+----------+-----------------------------+
|2024-02-01|                        119.7|
|2024-02-02|                        189.9|
|2024-02-09|                       1104.9|
|2024-02-10|                       1219.4|
|2024-02-11|                       1337.3|
|2024-02-12|                       1448.1|
|2024-02-13|                       1560.8|
|2024-02-14|                       1654.2|
|2024-02-15|                       1735.4|
|2024-02-16|                       1783.2|
|2024-02-18|                       2023.4|
|2024-02-19|                       2103.1|
|2024-02-20|                       2187.4|
|2024-02-21|                       2270.4|
|2024-02-22|                       2347.0|
|2024-02-23|                       2431.8|
|2024-02-24|                       2575.4|
|2024-02-25|                       2675.1|
|2024-02-26|                       2767.7|
|2024-02-27|                       2860.8|
|2024-02-28

## TASK 3 For each sensor, separately: Compute the total energy consumed and the average energy consumption per day

Our approach to this task was to calculate the energy consumed per day for each sensor by grouping the data by day and sensor and then compute max(energy) - min(energy) for each day to obtain the amount of energy consumped per day.

To obtain the average consumption per day for each sensor, we used the results from the the first step and took the average of all days of febraury for each sensor.

This means, the result contains the energy consumed per day for each sensor and day in february and as well in a new column the average consumed per day. <br/>

**NOTE** that the average is the same value for the same sensor at different days since it represents the average consumption per day in february.

What one can read from this is the days of the month where more or less than the average enegry was consumned per sensor.


The Daily average per sensor DataFrame for february looks as follows:

```
+------+----------------+
|sensor|avg_daily_energy|
+------+----------------+
|     A|             5.1|
|     B|             3.3|
|     C|             8.0|
|     D|            15.5|
|     E|            13.0|
|     F|             4.4|
|     G|             4.8|
|     H|            13.2|
|     I|             9.4|
|     J|             6.4|
|     K|             6.7|
+------+----------------+
```

In [None]:
# Compute the daily consumption per sensor
daily_total_energy = readings_february\
                          .withColumn("day", to_date("date"))\
                          .groupBy("day", "sensor")\
                          .agg(round(max("energy") - min("energy"), 1).alias("daily_energy"))\
                          .orderBy("day", "sensor")

# compute the daily average consumption
daily_avg_energy = daily_total_energy\
                          .groupBy("sensor")\
                          .agg(round(avg("daily_energy"), 1)\
                          .alias("avg_daily_energy"))\
                          .orderBy("sensor")


# Join the results to obtain a single datafram containing the results daily consumption as well as the average consumption
joined_results = daily_total_energy\
                          .join(daily_avg_energy, "sensor")\
                          .orderBy("day", "sensor")

joined_results.show(20)

# save results
joined_results.write.option("header", True).option("delimiter", ",").mode("overwrite").csv("T3_daily_total_and_average_energy.csv")

+------+----------+------------+----------------+
|sensor|       day|daily_energy|avg_daily_energy|
+------+----------+------------+----------------+
|     A|2024-02-01|         8.1|             5.1|
|     B|2024-02-01|         4.2|             3.3|
|     C|2024-02-01|         9.7|             8.0|
|     D|2024-02-01|        12.9|            15.5|
|     E|2024-02-01|        16.1|            13.0|
|     F|2024-02-01|        11.0|             4.4|
|     G|2024-02-01|         3.3|             4.8|
|     H|2024-02-01|        23.5|            13.2|
|     I|2024-02-01|        19.9|             9.4|
|     J|2024-02-01|         2.6|             6.4|
|     K|2024-02-01|         8.4|             6.7|
|     A|2024-02-02|         4.8|             5.1|
|     B|2024-02-02|         4.6|             3.3|
|     C|2024-02-02|         1.6|             8.0|
|     D|2024-02-02|         5.9|            15.5|
|     E|2024-02-02|        10.1|            13.0|
|     F|2024-02-02|         6.5|             4.4|


### TASK 4 For each sensor, separately: Compute the day of the month with minimum and maximum energy consumption.

Our approach to this task is to first compute the daily consumption per day as in the previous task, and then take the minimum and maximum day for each sensor.

Due to aggreagtion, we lose some information that we have to get back by joining again with the original readings from february.

It is possible, that a sensor appears twice in the data. This happens when a sensor has the same minimum or maximum energy consumption measurment for different days.


```
+------+----------+--------------+----------+--------------+
|sensor|max_energy|max_energy_day|min_energy|min_energy_day|
+------+----------+--------------+----------+--------------+
|     A|       8.1|    2024-02-01|       0.8|    2024-02-24|
|     B|       9.9|    2024-02-12|       0.1|    2024-02-09|
|     C|      14.0|    2024-02-11|       1.6|    2024-02-23|
|     C|      14.0|    2024-02-11|       1.6|    2024-02-02|
|   ...|       ...|           ...|       ...|           ...|
|     I|      20.7|    2024-02-29|       0.5|    2024-02-18|
|     J|      10.0|    2024-02-11|       1.7|    2024-02-18|
|     K|      10.2|    2024-02-11|       1.2|    2024-02-16|
+------+----------+--------------+----------+--------------+
```
For sensor C it becomes visible, that there are two days with minimum enegry consumption. Thats why the sensor appears twice in the result data.

In [None]:
# compute the daiyl enegry consumption for each sensor
daily_total_energy = readings_february\
                          .withColumn("day", to_date("date"))\
                          .groupBy("day", "sensor")\
                          .agg(round(max("energy") - min("energy"), 1).alias("daily_energy"))\
                          .orderBy("day", "sensor")

# compute the min and minimum and maxiumum daily energy consumption for each sensor
# Renaming the Sensor column is necessary to avoid naming conficts when joining
min_max_energy = daily_total_energy\
                          .groupBy("sensor")\
                          .agg(
                              min("daily_energy").alias("min_energy"),
                              max("daily_energy").alias("max_energy"),
                          ).withColumnRenamed("sensor", "sensor_min_max")

# Join back with the daily data to obtain the dates for min and max
# Selectig only necessary columns is improtant to avoid naming conflicts bewteen the dataframes.
joined_date_min_max = min_max_energy\
                          .join(daily_total_energy,
                            (min_max_energy.max_energy == daily_total_energy.daily_energy) &
                            (min_max_energy.sensor_min_max == daily_total_energy.sensor), "left"
                          )\
                          .dropDuplicates()\
                          .select("sensor_min_max", "max_energy", "day", "min_energy")\
                          .withColumnRenamed("day", "max_energy_day")\
                          .join(daily_total_energy,
                            (min_max_energy.min_energy == daily_total_energy.daily_energy) &
                            (min_max_energy.sensor_min_max == daily_total_energy.sensor), "left"
                          )\
                          .dropDuplicates()\
                          .select("sensor", "max_energy", "max_energy_day", "min_energy", "day")\
                          .withColumnRenamed("day", "min_energy_day")\
                          .orderBy("sensor")

joined_date_min_max.show()

# save results
joined_date_min_max.write.option("header", True).option("delimiter", ",").mode("overwrite").csv("T4_date_of_min_max_energy_consumption.csv")

+------+----------+--------------+----------+--------------+
|sensor|max_energy|max_energy_day|min_energy|min_energy_day|
+------+----------+--------------+----------+--------------+
|     A|       8.1|    2024-02-01|       0.8|    2024-02-24|
|     B|       9.9|    2024-02-12|       0.1|    2024-02-09|
|     C|      14.0|    2024-02-11|       1.6|    2024-02-23|
|     C|      14.0|    2024-02-11|       1.6|    2024-02-02|
|     D|      26.4|    2024-02-29|       5.7|    2024-02-16|
|     E|      20.6|    2024-02-13|       4.7|    2024-02-23|
|     F|      12.9|    2024-02-29|       0.8|    2024-02-18|
|     G|       9.3|    2024-02-10|       0.7|    2024-02-09|
|     H|      26.1|    2024-02-28|       2.1|    2024-02-26|
|     I|      20.7|    2024-02-29|       0.5|    2024-02-18|
|     J|      10.0|    2024-02-11|       1.7|    2024-02-18|
|     K|      10.2|    2024-02-11|       1.2|    2024-02-16|
+------+----------+--------------+----------+--------------+



## TASK 5 and 6 Setup and utils functions


### Utils funcitons:
**interpolate (UDF):** interpolates the energy value between two given dates and values. The interpolation is based on a simple linear interpolation function.

```math
interpolated_value = energy_measure_1 + (target_seconds * (energy_measure_2 - energy_measure_1) / total_seconds)

where:
energy_measure_1 is the energy measure before
target_seconds is the the distance in seconds between the timestamp of the measurement before and the timestamp we want to interpolate
energy_measure_2 is the energy measure after
total_seconds is the distnace in seconds between energy_measure_1 and energy_measure_2
```


**generate_hourly_timestamps:** Generates a series of equidistnat timestamps between a given start and end date. The timestamps are always produced for each full hour. E.g.
```math
start_time: date = 2024-02-01 00:00:00
end_time: date = 2024-02-02 00:00:00

the funciton would produce a dataframe with the fillowing rows:

    2024-02-01 00:00:00
    2024-02-01 01:00:00
    ...
    2024-02-01 23:00:00
    2024-02-02 00:00:00
```

### Interpretation of the tasks 5 and 6
**5.** We understand task 5 as follows: We have to show the interpolated value of each sensor for each full hour of every day in february.

**6.** We understand tasks 6 as follows: We are asked to provide the running total for each sensor and hour based on the interpolated the data. The output is one dataframe showing the running total of every hour of the day of the month febraury.

Therefore we can reuse the dataframe obtain in task five for further processing

**Difference** The key difference between the tasks is, that task 5 only requires to interpolate the missing data for hours and days, while task 6 asks to proive the running total for february.


## Approach Task 5:
The overall approach to this task was to use the above described utility function to create artificial readings for each full hour, and then interpolate their enegry measurements by using the closest real merasurments.

### 1. step:
Define the start and end time of the interpolation period. In this case its the whole month of february for each distinct sensors.

```
+------+-------------------+-------------------+
|sensor|         start_time|           end_time|
+------+-------------------+-------------------+
|     K|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     F|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     E|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     B|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     D|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     C|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     J|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     A|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     G|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     I|2024-02-01 00:00:00|2024-03-01 00:00:00|
|     H|2024-02-01 00:00:00|2024-03-01 00:00:00|
+------+-------------------+-------------------+
```
### 2. step: creating equidistant timestamps
Then, we generated the timestamps for each full hour and exploded them. This is necessary to capture the estimation for the whole hours and also the whole day of energy consumption. The energy value is always set to NULL since this is what we want to estimate.

### 3. step: union with real measurements
In the next step we formed a union of the real measurements with the generated equidistnat timestamps we generated. This provides a dataframe that contains all real measurements and timestamps as well as all the generated ones that have energy = NULL.

### 4. step: define a window function.
we defined a window function that partitions by sensor and orders by timestamp. This is necessary to interpolate the data for each sensor separately and also keep the order of the timestampss

### 5. step apply window function to obtain valid previous and next reading:
apply the window function. We applied the window function in a way, that for every row we added 4 new columns. First we added the last timetamp and energy reading that happened before the current reading but with the constraint that the energy reading cannot be NULL. This ensures, that for each row, we get the first previous real energy reading. We did the same for the next timestamp and real energy reading. Then we filtered all NULL values to obtain a dataframe that containes the following information:

```
+-------------------+------+------+--------------------+--------------------+-----------------+-----------------+
|               date|sensor|energy|prev_valid_timestamp|next_valid_timestamp|prev_energy_level|next_energy_level|
+-------------------+------+------+--------------------+--------------------+-----------------+-----------------+
|2024-02-01 00:00:00|     B|  NULL|                NULL| 2024-02-01 00:03:32|             NULL|            627.5|
|2024-02-01 01:00:00|     B|  NULL| 2024-02-01 00:57:48| 2024-02-01 01:02:56|            627.6|            627.6|
|2024-02-01 02:00:00|     B|  NULL| 2024-02-01 01:55:45| 2024-02-01 02:00:53|            627.8|            627.8|
|2024-02-01 03:00:00|     B|  NULL| 2024-02-01 02:59:33| 2024-02-01 03:04:41|            627.9|            627.9|
|2024-02-01 04:00:00|     B|  NULL| 2024-02-01 03:56:24| 2024-02-01 04:01:32|            628.0|            628.0|
|2024-02-01 05:00:00|     B|  NULL| 2024-02-01 04:55:26| 2024-02-01 05:00:34|            628.2|            628.2|
|2024-02-01 06:00:00|     B|  NULL| 2024-02-01 05:57:03| 2024-02-01 06:02:11|            628.2|            628.2|
|2024-02-01 07:00:00|     B|  NULL| 2024-02-01 06:58:40| 2024-02-01 07:03:48|            628.2|            628.2|
|2024-02-01 08:00:00|     B|  NULL| 2024-02-01 07:55:30| 2024-02-01 08:00:38|            628.3|            628.3|
|2024-02-01 09:00:00|     B|  NULL| 2024-02-01 08:57:07| 2024-02-01 09:02:15|            628.3|            628.3|
|2024-02-01 10:00:00|     B|  NULL| 2024-02-01 09:58:43| 2024-02-01 10:03:51|            628.3|            628.3|
|2024-02-01 11:00:00|     B|  NULL| 2024-02-01 10:57:02| 2024-02-01 11:02:10|            628.5|            628.5|
|2024-02-01 12:00:00|     B|  NULL| 2024-02-01 11:58:38| 2024-02-01 12:01:12|            628.6|            628.7|
|2024-02-01 13:00:00|     B|  NULL| 2024-02-01 12:55:51| 2024-02-01 13:00:59|            628.8|            628.8|
|2024-02-01 14:00:00|     B|  NULL| 2024-02-01 13:57:49| 2024-02-01 14:02:57|            629.0|            629.0|
|2024-02-01 15:00:00|     B|  NULL| 2024-02-01 14:55:02| 2024-02-01 15:00:10|            629.1|            629.1|
|2024-02-01 16:00:00|     B|  NULL| 2024-02-01 15:58:06| 2024-02-01 16:03:15|            629.3|            629.3|
|2024-02-01 17:00:00|     B|  NULL| 2024-02-01 16:08:23| 2024-02-01 17:11:41|            629.5|            629.6|
|2024-02-01 18:00:00|     B|  NULL| 2024-02-01 17:56:04| 2024-02-01 18:01:12|            630.7|            630.7|
|2024-02-01 19:00:00|     B|  NULL| 2024-02-01 18:58:47| 2024-02-01 19:02:27|            630.9|            631.0|
+-------------------+------+------+--------------------+--------------------+-----------------+-----------------+
```
prev_valid_timestamp refers to the first valid timestamp before a current timestamp and next_valid_timestamp to the next valid after a current timestamp. energy values are the according measuremnts. date is the timestamp we want to interpolate. This dataframe contains all information to interpolate the missing energy values for full hours.

**NOTE** Since the data has no measurements before february, the first timestamp in the data is in february. This means, that there is no previous timestamp for 2024-02-01 00:00:00. this can be seen in the table as NULL in the prev_valid_timestamp and prev_valid_energy_reading.

The interploation function catches these edge cases and provides the reading of the next_valid_date as result instead. This means the approximation of 2024-02-01 00:00:00 is done by using the value of the very first reading of february.

### 6. step: interpolate
Now that we collected all data needed, we can interpolate the missing energy measurement value by using the first valid previous and first valid next timestamp as well as the energy measurements to obtain the interpolated value for each hour for every day of the month february.

In [None]:
# Define a UDF as interpolation function between two dates
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType
from datetime import datetime, timedelta

def interpolate(date_1, energy_measure_1, date_2, energy_measure_2, date_to_interpolate):
    """
    Interpolates the value at a specific time between two timestamps and corresponding energy values.

    Parameters:
    date_1: Timestamp of the last measurement of the previous day
    energy_measure_1: The energy measurement of the previous day at the timestamp
    date_2: Timestamp of the first measurement of the current day
    energy_measure_2: The energy measurement of the current day at the timestamp
    date_to_interpolate: The timestamp of the date to interpolate the energy values at

    e.g:
    date_1 = datetime.strptime('2019-02-01-23-50-00', '%Y-%m-%d-%H-%M-%S')
    date_2 = datetime.strptime('2019-02-02-00-10-00', '%Y-%m-%d-%H-%M-%S')
    target = datetime.strptime('2019-02-02-00-00-00', '%Y-%m-%d-%H-%M-%S')


    energy_measure_1 = 1
    energy_measure_2 = 3

    expected return = interpolate(date_1, energy_measure_1, date_2, energy_measure_2, target) = 2

    Returns:
    Interpolated value at 00:00 Timestamp, if applicable; otherwise None.
    """
    # ensure that neihter date 1 nor date 2 is not available
    if date_1 is None and date_2 is not None: return energy_measure_2
    if date_1 is not None and date_2 is  None: return energy_measure_1

    # ensure that date_1 is before date_2
    if date_1 > date_2: Exception("Date 1 cannot be greater than date 2!")

    # check that interpolation date is not on of the gien prev or next dates
    if date_2 == date_to_interpolate: return energy_measure_2
    if date_1 == date_to_interpolate: return energy_measure_1

    # Calculate total seconds between the two given measurements
    total_seconds = (date_2 - date_1).total_seconds()

    # Calculate the seconds from date_1 to the date to be interpolated
    target_seconds = (date_to_interpolate - date_1).total_seconds()

    # Perform linear interpolation
    interpolated_value = energy_measure_1 + (target_seconds * (energy_measure_2 - energy_measure_1) / total_seconds)

    return interpolated_value

# Convert the interpolation function to User Defined Function UDF
interpolate_udf = udf(lambda a, b, c, d, e: interpolate(a, b, c, d, e), StringType())


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, sequence, date_trunc, col, expr, lit

def generate_hourly_timestamps(df, start_col, end_col):
    """
    Generate and explode houlry timestamps always at the exact hour between start and end time.
    e.g:
    2024-02-03 00:00:00
    2024-02-03 01:00:00
    2024-02-03 02:00:00
    ...
    2024-02-03 23:00:00
    """
    return df.withColumn('date', explode(
            sequence(
                date_trunc('hour', col(start_col)), date_trunc('hour', col(end_col)), expr('INTERVAL 1 HOUR')
            )
        )
    ).withColumn('energy', lit(None))

To handle edge cases, the february redings used in the previous tasks are not sufficient. Therefore a new february extended DataFrame is created that contains more data outside of the month february to ensure propper interpolation

In [None]:
# extract extended readings from February
readings_february_extended = readings\
                .filter("date >= date '2024-01-01' AND date <= date '2024-03-05'")\
                .orderBy("sensor", "date")

readings_february_extended.show(5)

# save results
readings_february_extended.write.option("header", True) \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("readings_february_extended.csv")

+-------------------+------+------+
|               date|sensor|energy|
+-------------------+------+------+
|2024-02-01 00:00:25|     A| 650.5|
|2024-02-01 00:05:34|     A| 650.5|
|2024-02-01 00:10:42|     A| 650.5|
|2024-02-01 00:15:50|     A| 650.5|
|2024-02-01 00:20:58|     A| 650.5|
+-------------------+------+------+
only showing top 5 rows



In [None]:
#@title 5. For each sensor, separately, with estimations: Compute the estimated value of each sensor meter for every hour and day of the month (in ascending order)


# STEP 1: generaete a dataframe that contains each sensor once and then add the start and endtime for february
distinct_sensors = readings_february_extended.select("sensor")\
                .distinct()\
                .withColumn('start_time', lit(datetime(2024,2,1,00,00,00)))\
                .withColumn('end_time', lit(datetime(2024,3,1,00,00,00)))


# STEP 2: generate intervals of 1 hour always to the full hour
df_hourly = generate_hourly_timestamps(distinct_sensors, "start_time", "end_time")\
                .select("date", "sensor", "energy")

# STEP 3: add the interval to the original energy dataframe
unified_df = readings_february_extended.union(df_hourly).orderBy("sensor", 'date')


# STEP 4: Define the window specification ordered by timestamp and partitioned by sensor
window_spec = Window.partitionBy("sensor").orderBy("date")

# STEP 5: Apply window to create new columns with previous and next non-null energy readings and timestanps
df_with_bounds = unified_df\
                .withColumn('prev_valid_timestamp', last(when(col('energy').isNotNull(), col('date')), ignorenulls=True)\
                            .over(window_spec.rowsBetween(Window.unboundedPreceding, -1)))\
                .withColumn('next_valid_timestamp',first(when(col('energy').isNotNull(), col('date')),ignorenulls=True)\
                            .over(window_spec.rowsBetween(1, Window.unboundedFollowing)))\
                .withColumn('prev_energy_level',last('energy', ignorenulls=True)\
                            .over(window_spec.rowsBetween(Window.unboundedPreceding, -1)))\
                .withColumn('next_energy_level', first('energy', ignorenulls=True)\
                            .over(window_spec.rowsBetween(1, Window.unboundedFollowing)))\
                .filter(col('energy').isNull())

# STEP 6: Add interpolation
interpolated_df = df_with_bounds.withColumn(
                'interpolated_energy', round(interpolate_udf(
                    col('prev_valid_timestamp'),
                    col('prev_energy_level'),
                    col('next_valid_timestamp'),
                    col('next_energy_level'),
                    col('date')
                ), 1))\
                .drop(
                    'prev_valid_timestamp',
                    'next_valid_timestamp',
                    'prev_energy_level',
                    'next_energy_level',
                    'energy'
                )\
                .orderBy("date", "sensor")

# show results
interpolated_df.show()

# save results
interpolated_df.write.option("header", True).option("delimiter", ",").mode("overwrite").csv("T5_interpolated_energy_readings.csv")

+-------------------+------+-------------------+
|               date|sensor|interploated_energy|
+-------------------+------+-------------------+
|2024-02-01 00:00:00|     A|              650.5|
|2024-02-01 00:00:00|     B|              627.5|
|2024-02-01 00:00:00|     C|             1098.8|
|2024-02-01 00:00:00|     D|             2615.0|
|2024-02-01 00:00:00|     E|             1874.0|
|2024-02-01 00:00:00|     F|              748.0|
|2024-02-01 00:00:00|     G|              833.7|
|2024-02-01 00:00:00|     H|             2145.4|
|2024-02-01 00:00:00|     I|              927.2|
|2024-02-01 00:00:00|     J|              966.7|
|2024-02-01 00:00:00|     K|              841.2|
|2024-02-01 01:00:00|     A|              650.5|
|2024-02-01 01:00:00|     B|              627.6|
|2024-02-01 01:00:00|     C|             1098.8|
|2024-02-01 01:00:00|     D|             2616.5|
|2024-02-01 01:00:00|     E|             1875.0|
|2024-02-01 01:00:00|     F|              748.7|
|2024-02-01 01:00:00

## TASK 6: Compute the estimated running total of the energy consumed so far. The value should be updated every hour.

Our approach to this taks was to reuse the results of the interpolation of the previous task and continue processing them to obtain the estimated running value for each hour.

As a first step we again calculated the initial reading of february for each sensor as in task 2.

Then we joined the interpolated data with the initial readings to then subtract the initial reading from every energy reading entry in the joined dataframe.

The outcome is an (estimated) interpolated running total for the month february

In [None]:
# get the first reading for each sensor
initial_reading_df = interpolated_df.groupBy("sensor").agg(min("interpolated_energy").alias("initial_energy"))

# reuse interpolated data from preious task. Task. 5 must run first
interpolated_running_df = interpolated_df.join(initial_reading_df, "sensor", "left")\
                .withColumn("interpolated_running_total_february", round(col("interpolated_energy") - col("initial_energy"), 1))\
                .drop("initial_energy", "interploated_running_total")\
                .orderBy("sensor", "date")

# compute the hourly consumption
interpolated_running_df.show()

# save results
interpolated_running_df.write.option("header", True).option("delimiter", ",").mode("overwrite").csv("T6_interpolated_running_total.csv")

+------+-------------------+-------------------+-----------------------------------+
|sensor|               date|interploated_energy|interpolated_running_total_february|
+------+-------------------+-------------------+-----------------------------------+
|     A|2024-02-01 00:00:00|              650.5|                                0.0|
|     A|2024-02-01 01:00:00|              650.5|                                0.0|
|     A|2024-02-01 02:00:00|              650.6|                                0.1|
|     A|2024-02-01 03:00:00|              650.6|                                0.1|
|     A|2024-02-01 04:00:00|              650.6|                                0.1|
|     A|2024-02-01 05:00:00|              650.9|                                0.4|
|     A|2024-02-01 06:00:00|              651.1|                                0.6|
|     A|2024-02-01 07:00:00|              651.1|                                0.6|
|     A|2024-02-01 08:00:00|              651.1|                 