<a href="https://colab.research.google.com/github/tiagopecurto/tiagopecurto/blob/main/70705-Q4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sistemas para Processamento de Big Data
## TP1 - Energy Meter Monitoring




The sensor data corresponds to regular readings from 11 residential energy meters. The data covers the month of February 2024.

Each data sample has the following schema:

timestamp | sensor_id | energy
----------|-------------|-----------
timestamp | string  | float

Each energy value (KWh) corresponds to the accumulated value of the meter at the time of measurement. As such,
each meter is expected to produce a monotonically increasing series of pairs of timestamp and energy consummed up to that moment.

The meters do not start at zero or at the same value.


## Questions

The following questions should be answered for the month of February and only for this month.

### For the group of sensors:

1. Compute the total energy consumed.

2. Compute the running total energy consumed so far for each day, inclusive.

Note: You can approximate the result but using the last reading of each day from each sensor.

### For each sensor, separately:

3. Compute the total energy consumed and the average energy consumption per day.

4. Compute the day of the month with minimum and maximum energy consumption.

Note: You can approximate the result but using the last reading of each day from each sensor.

### For each sensor, separately, with estimations:

**Assumptions:**

+ Readings may be missing for extended periods due to communication problems with the sensors.

+ Readings are collected do not fall precisely "on the hour". The are collected and recorded any time.

+ For more precise results, estimate the value of the meter at precise timestamp, using linear interpolation from nearest readings.

5. Compute the **estimated** value of each sensor meter for every hour and day of the month (in ascending order).

6. Compute the **estimated** running total of the energy consumed so far. The value should be updated every hour.

## Requeriments

Solve each question using Structured Spark, either Dataframes or SQL or both.

## Other Grading Criteria

+ Grading will also take into account the general clarity of the programming and of the presentation report (notebook).




### Deadline
+ November 10, 23h59

For each day late, ***0.5 / day penalty***. Penalty accumulates until the grade of the assignment reaches 8.0.

---
### Colab Setup


In [None]:
#@title Install PySpark
!pip install pyspark findspark --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
#@title Download the dataset

!wget -q -O energy-readings.csv https://raw.githubusercontent.com/smduarte/spbd-2425/refs/heads/main/docs/labs/projs/energy-readings.csv
!head -2 energy-readings.csv

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]') \
						.appName('energy').getOrCreate()

sc = spark.sparkContext

try :
    readings = spark.read.csv('energy-readings.csv', sep =';', header=True, inferSchema=True)

    #0.Filtra os dados para manter apenas o mês de fevereiro de 2024
    readings_february = readings.filter((month("date") == 2) & (year("date") == 2024))

except Exception as err:
    print(err)

**Question 4** - For each sensor separately, compute the day of the month with minimum and maximum energy consumption.

In [None]:
#2.1: Calcula o consumo diário de cada sensor
daily_consumption_per_sensor = readings_february.withColumn("day", to_date("date")) \
  .groupBy("day", "sensor") \
  .agg(round(max("energy") - min("energy"),3).alias("daily_consumption")) \
  .orderBy("day", "sensor")

# 4.1 Calcula o valor mínimo e máximo de consumo diário para cada sensor
min_consumption_day = daily_consumption_per_sensor.groupBy("sensor") \
    .agg(min("daily_consumption").alias("min_daily_consumption"))

max_consumption_day = daily_consumption_per_sensor.groupBy("sensor") \
    .agg(max("daily_consumption").alias("max_daily_consumption"))

# max_consumption_day.show()

# 4.2 Filtra para encontrar o dia onde o consumo foi mínimo
min_day = daily_consumption_per_sensor.alias("dcps").join(
    min_consumption_day.alias("min_day"),
    (col("dcps.sensor") == col("min_day.sensor")) &
    (col("dcps.daily_consumption") == col("min_day.min_daily_consumption")),"inner"
).select(
    col("dcps.sensor"),
    col("dcps.day").alias("min_consumption_day"),
    col("dcps.daily_consumption").alias("min_daily_consumption")
).orderBy("sensor")

# 4.3 Filtra para encontrar o dia onde o consumo foi máximo
max_day = daily_consumption_per_sensor.alias("dcps").join(
    max_consumption_day.alias("max_day"),
    (col("dcps.sensor") == col("max_day.sensor")) &
    (col("dcps.daily_consumption") == col("max_day.max_daily_consumption")),"inner"
).select(
    col("dcps.sensor"),
    col("dcps.day").alias("max_consumption_day"),
    col("dcps.daily_consumption").alias("max_daily_consumption")
).orderBy("sensor")

# 4.4 Exibe os resultados
min_day.show()
max_day.show()

+------+-------------------+---------------------+
|sensor|min_consumption_day|min_daily_consumption|
+------+-------------------+---------------------+
|     A|         2024-02-24|                 0.77|
|     B|         2024-02-09|                  0.1|
|     C|         2024-02-02|                  1.6|
|     C|         2024-02-23|                  1.6|
|     D|         2024-02-16|                  5.7|
|     E|         2024-02-23|                  4.7|
|     F|         2024-02-18|                  0.8|
|     G|         2024-02-09|                  0.7|
|     H|         2024-02-26|                  2.1|
|     I|         2024-02-18|                  0.5|
|     J|         2024-02-18|                  1.7|
|     K|         2024-02-16|                  1.2|
+------+-------------------+---------------------+

+------+-------------------+---------------------+
|sensor|max_consumption_day|max_daily_consumption|
+------+-------------------+---------------------+
|     A|         2024-02-01|  

**Question 5** - Compute the estimated value of each sensor meter for day of the month (in ascending order).

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta

# 5.0 Função de Interpolação Linear - Adaptada de StackOverflow "Pyspark : Interpolation of missing values in pyspark dataframe observed"
    #Link: https://stackoverflow.com/questions/53077639/pyspark-interpolation-of-missing-values-in-pyspark-dataframe-observed
def fill_linear_interpolation(df, id_cols, order_col, value_col):
    w = Window.partitionBy(id_cols).orderBy(order_col)
    new_df = df.withColumn('rn', F.row_number().over(w))
    new_df = new_df.withColumn('rn_not_null', F.when(F.col(value_col).isNotNull(), F.col('rn')))

    w_start = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(Window.unboundedPreceding, -1)
    new_df = new_df.withColumn('start_val', F.last(value_col, True).over(w_start))
    new_df = new_df.withColumn('start_rn', F.last('rn_not_null', True).over(w_start))

    w_end = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(0, Window.unboundedFollowing)
    new_df = new_df.withColumn('end_val', F.first(value_col, True).over(w_end))
    new_df = new_df.withColumn('end_rn', F.first('rn_not_null', True).over(w_end))

    if not isinstance(id_cols, list):
        id_cols = [id_cols]

    new_df = new_df.withColumn('diff_rn', F.col('end_rn') - F.col('start_rn'))
    new_df = new_df.withColumn('curr_rn', F.col('diff_rn') - (F.col('end_rn') - F.col('rn')))

    lin_interp_func = (F.col('start_val') + (F.col('end_val') - F.col('start_val')) / F.col('diff_rn') * F.col('curr_rn'))
    new_df = new_df.withColumn(value_col, F.when(F.col(value_col).isNull(), lin_interp_func).otherwise(F.col(value_col)))

    return new_df.drop('rn', 'rn_not_null', 'start_val', 'end_val', 'start_rn', 'end_rn', 'diff_rn', 'curr_rn')

# 5.1. Cria uma série de datas para todo o mês de fevereiro de 2024
date_range_df = spark.createDataFrame([(datetime(2024, 2, 1), datetime(2024, 2, 29))], ["start_date", "end_date"])
date_df = date_range_df.select(explode(sequence("start_date", "end_date")).alias("day"))
date_df = date_df.withColumn("day", to_date("day"))

# 5.2. Obter a leitura máxima diária para cada sensor
readings_february = readings_february.withColumn("day", to_date("date"))
max_daily_readings = readings_february \
    .groupBy("sensor", "day") \
    .agg(max("energy").alias("max_energy")) \
    .orderBy("sensor", "day")

# 5.3. Realizar um join para garantir que todos os dias de fevereiro estão presentes para cada sensor
sensors_df = max_daily_readings.select("sensor").distinct()
all_dates_for_sensors = sensors_df.crossJoin(date_df)

# 5.4 Left join para completar com dados reais onde disponíveis e null onde não há contagens
complete_daily_data = all_dates_for_sensors.join(
    max_daily_readings,
    on=["day", "sensor"],
    how="left"
).orderBy("day", "sensor")

# 5.5. Aplicar a função de interpolação linear
interpolated_daily_data = fill_linear_interpolation(complete_daily_data, id_cols="sensor", order_col="day", value_col="max_energy")

# 5.6. Aplicar a função de interpolação linear
interpolated_daily_data = interpolated_daily_data.withColumn("max_energy", round("max_energy",3))

# 5.6. Exibir o resultado final com os valores interpolados
interpolated_daily_data.show()

**Question 6** - For each sensor, separately, with estimations compute the estimated running total of the energy consumed so far. The value should be updated every hour.

In [None]:
from pyspark.sql import functions as F
from datetime import datetime


# 1. Filtrar apenas os dados do mês de fevereiro de 2024
readings_february = readings.filter((F.month("date") == 2) & (F.year("date") == 2024)).orderBy("sensor", "date")

# 2. Reduzir a precisão de `date` para a hora (YYYY-MM-DD HH:00:00), para agrupar por sensor e hora
readings_february = readings_february.withColumn("hour", F.date_trunc("hour", "date")).orderBy("sensor", "hour")

# 3. Para cada sensor e cada hora, obter o valor máximo de energia, descartando múltiplas contagens na mesma hora
max_hourly_readings = readings_february.groupBy("sensor", "hour").agg(F.max("energy").alias("energy"))

# 4. Criar uma sequência de timestamps para cada hora no mês de fevereiro de 2024
start_timestamp = datetime(2024, 2, 1, 0, 0, 0)
end_timestamp = datetime(2024, 2, 29, 23, 0, 0)

# Gera uma sequência de timestamps a cada hora
timestamp_df = spark.createDataFrame([(start_timestamp, end_timestamp)], ["start_timestamp", "end_timestamp"]) \
    .select(F.explode(F.sequence("start_timestamp", "end_timestamp", F.expr("INTERVAL 1 HOUR"))).alias("timestamp"))

# 5. Fazer um cross join entre os sensores e os timestamps de fevereiro para garantir uma série completa
sensors_df = max_hourly_readings.select("sensor").distinct()
all_timestamps_for_sensors = sensors_df.crossJoin(timestamp_df)

# 6. Realizar o left join para preencher com dados reais onde disponíveis e `null` onde faltam contagens
complete_hourly_data = all_timestamps_for_sensors.join(
    max_hourly_readings,
    on=(all_timestamps_for_sensors["sensor"] == max_hourly_readings["sensor"]) &
       (all_timestamps_for_sensors["timestamp"] == max_hourly_readings["hour"]),
    how="left"
)

# 7. Selecionar as colunas finais e renomear para remover ambiguidades
complete_hourly_data = complete_hourly_data.select(
    all_timestamps_for_sensors["sensor"],
    all_timestamps_for_sensors["timestamp"],
    max_hourly_readings["energy"]
).orderBy("sensor", "timestamp")


# Exibir o resultado final para confirmação
#complete_hourly_data.show(50)

interpolated_hourly_data = fill_linear_interpolation(complete_hourly_data, id_cols="sensor", order_col="timestamp", value_col="energy")

readings_february_interpolated = interpolated_hourly_data.filter((F.month("timestamp") == 2) & (F.year("timestamp") == 2024))


consumption_february = readings_february_interpolated.groupBy("sensor").agg(
    F.first("energy").alias("start_energy"),
    F.last("energy").alias("end_energy")
)

consumption_february = consumption_february.withColumn("total_consumption", round(F.col("end_energy") - F.col("start_energy"),3))

result = consumption_february.select("sensor", "total_consumption").orderBy("sensor")

result.show()

+------+-----------------+
|sensor|total_consumption|
+------+-----------------+
|     A|           166.38|
|     B|           129.71|
|     C|           257.22|
|     D|            485.9|
|     E|           447.76|
|     F|           159.71|
|     G|           168.37|
|     H|            478.1|
|     I|           351.41|
|     J|           230.05|
|     K|            225.5|
+------+-----------------+

