<a href="https://colab.research.google.com/github/trfrancisco/PStr_P1/blob/main/ps2024_tp1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processamento de Streams 2024
## TP1 - Energy Meter Monitoring


The sensor data corresponds to (periodic) readings from 11 residential energy meters. The data covers the month of February 2024, and is streamed off Kafka.

Each data sample has the following schema:

timestamp | sensor_id | energy
----------|-------------|-----------
timestamp | string  | float

Each energy value (KWh) corresponds to the accumulated value of the meter at the time of measurement. As such,
each meter is expected to produce a monotonically increasing series of pairs of timestamp and energy consummed up to that moment.

The meters do not start at zero or at the same value.

The contracted energy provider is [SU Eletricidade](https://sueletricidade.pt/en/home)

The cost of energy varies depending on the time of day, according to the table below:

vazio | super-vazio | cheias | ponta |
------|-------------|--------|-------|
0.1072€| 0.1072€ | 0.1741€ | 0.2400€|

The plan corresponds to the [daily schedule tariff](https://sueletricidade.pt/en/schedules/546/daily-and-weekly-timetable), so the schedule is the same
for all days of the week.

## Questions

For each sensor, separately:

1. Compute the running total energy consumed so far, for the month. The value should be updated every 5 minutes. (Sorted in descending order by value and sensor.)

2. Compute the running total energy consumed so far, for the day. The value should be updated every 5 minutes. (Sorted in descending order by value and sensor.)

3. For the current day, compute the total energy used in each half hour period. The value should be updated every 5 minutes. (Sorted by period; a column for each sensor)

4. Compute the running total expense for the day. The value should be updated every minute. (Sorted in descending order by value and sensor.)



## Requeriments

Solve each question using Structured Spark Streaming.

## Other Grading Criteria

+ Grading will also take into account the general clarity of the programming and of the presentation report (notebook).




### Deadline

26th April + 1/2 day - ***no penalty***

For each day late, ***0.5 / day penalty***. Penalty accumulates until the grade of the assignment reaches 8.0.

---
### Colab Setup


In [None]:
#@title Mount Google Drive (Optional)
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#@title Install PySpark
!pip install pyspark findspark --quiet
import findspark
findspark.init()
findspark.find()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


'/usr/local/lib/python3.10/dist-packages/pyspark'

In [None]:
#@title Install & Launch Kafka
%%bash
KAFKA_VERSION=3.7.0
KAFKA=kafka_2.12-$KAFKA_VERSION
wget -q -O /tmp/$KAFKA.tgz https://dlcdn.apache.org/kafka/$KAFKA_VERSION/$KAFKA.tgz
tar xfz /tmp/$KAFKA.tgz
wget -q -O $KAFKA/config/server1.properties - https://github.com/smduarte/ps2024/raw/main/colab/server1.properties

UUID=`$KAFKA/bin/kafka-storage.sh random-uuid`
$KAFKA/bin/kafka-storage.sh format -t $UUID -c $KAFKA/config/server1.properties
$KAFKA/bin/kafka-server-start.sh -daemon $KAFKA/config/server1.properties

metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY})
Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.


### Energy sensor data publisher
This a small python Kafka client that publishes a continous stream of text lines, obtained from the periodic output of the sensors.

* The Kafka server is accessible @localhost:9092
* The events are published to the `energy` topic
* Events are published 60x faster than realtime relative to the timestamp


In [5]:
#@title Start Kafka Publisher
%%bash
pip install kafka-python dataclasses --quiet
wget -q -O - https://github.com/smduarte/ps2024/raw/main/colab/kafka-tp1-logsender.tgz | tar xfz - 2> /dev/null
wget -q -O data-sorted.csv https://github.com/smduarte/ps2024/raw/main/tp1/data-sorted.csv

nohup python kafka-tp1-logsender/publisher.py --filename data-sorted.csv --topic energy  --speedup 60 2> kafka-publisher-error.log > kafka-publisher-out.log &

In [None]:
#@title Python Kafka client (For Debugging)
!pip -q install confluent-kafka
from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'localhost:9092',
        'group.id': '*',
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest'}

try:
  consumer = Consumer(conf)
  consumer.subscribe(['energy'])

  while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None: continue
    print(msg.value())
finally:
  consumer.close()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m12.6 MB/s[0m eta [36m0:00:00[0m
[?25hb'{"timestamp": "2024-02-01 00:00:00", "sensor_id": "D", "energy": 2615.0}'
b'{"timestamp": "2024-02-01 00:00:18", "sensor_id": "C", "energy": 1098.8}'
b'{"timestamp": "2024-02-01 00:00:25", "sensor_id": "A", "energy": 650.5}'
b'{"timestamp": "2024-02-01 00:00:33", "sensor_id": "J", "energy": 966.7}'
b'{"timestamp": "2024-02-01 00:00:42", "sensor_id": "H", "energy": 2145.4}'
b'{"timestamp": "2024-02-01 00:00:54", "sensor_id": "E", "energy": 1874.0}'
b'{"timestamp": "2024-02-01 00:01:52", "sensor_id": "K", "energy": 841.2}'
b'{"timestamp": "2024-02-01 00:02:00", "sensor_id": "E", "energy": 1874.1}'
b'{"timestamp": "2024-02-01 00:02:20", "sensor_id": "I", "energy": 927.2}'
b'{"timestamp": "2024-02-01 00:02:36", "sensor_id": "K", "energy": 841.3}'
b'{"timestamp": "2024-02-01 00:03:24", "sensor_id": "G", "energy": 833.7}'
b'{"timestamp": "2024-02-01 00:03:32", "senso

KeyboardInterrupt: 

The python code below shows the basics needed to process JSON data from Kafka source using PySpark.

Spark Streaming python documentation is found [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.streaming.html)

---
#### PySpark Kafka Stream Example


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def dumpBatchDF(df, epoch_id):
    df.show(20, False)

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
    .getOrCreate()

lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'energy') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('energy', FloatType(), True)])

lines = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')

lines = lines.groupBy(window(col("timestamp"), "5 minutes")).count()

query = lines \
    .writeStream \
    .outputMode('append') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()

+-------------------+---------+------+
|timestamp          |sensor_id|energy|
+-------------------+---------+------+
|2024-02-01 00:00:00|D        |2615.0|
|2024-02-01 00:00:18|C        |1098.8|
|2024-02-01 00:00:25|A        |650.5 |
|2024-02-01 00:00:33|J        |966.7 |
|2024-02-01 00:00:42|H        |2145.4|
|2024-02-01 00:00:54|E        |1874.0|
|2024-02-01 00:01:52|K        |841.2 |
|2024-02-01 00:02:00|E        |1874.1|
|2024-02-01 00:02:20|I        |927.2 |
|2024-02-01 00:02:36|K        |841.3 |
|2024-02-01 00:03:24|G        |833.7 |
|2024-02-01 00:03:32|B        |627.5 |
|2024-02-01 00:04:24|D        |2615.1|
|2024-02-01 00:04:40|F        |748.0 |
|2024-02-01 00:04:44|H        |2145.5|
|2024-02-01 00:05:26|C        |1098.8|
|2024-02-01 00:05:34|A        |650.5 |
|2024-02-01 00:05:42|J        |966.7 |
|2024-02-01 00:05:46|F        |748.1 |
|2024-02-01 00:06:26|J        |966.8 |
+-------------------+---------+------+
only showing top 20 rows

+-------------------+---------+------+

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=41>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/sock

+-------------------+---------+------+
|timestamp          |sensor_id|energy|
+-------------------+---------+------+
|2024-02-01 08:34:11|K        |843.8 |
|2024-02-01 08:34:15|G        |834.5 |
+-------------------+---------+------+



Py4JError: An error occurred while calling o54.awaitTermination

### 1. Compute the running total energy consumed so far, for the month. The value should be updated every 5 minutes. (Sorted in descending order by value and sensor.)

In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

def dumpBatchDF(df, epoch_id):
  joined_df = df \
          .select("sensor_id", \
                (col("max_energy") - col("min_energy")).alias("consumed"))

  ordered_joined_df = joined_df.orderBy(col('consumed').desc())
  ordered_joined_df.show(truncate=False)


spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
    .getOrCreate()


lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'energy') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('energy', FloatType(), True)])

lines = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')



lines_aggregated = lines.withWatermark('timestamp', '1 second') \
    .groupBy('sensor_id') \
    .agg(
        max("energy").alias("max_energy"),
        min("energy").alias("min_energy")
    ) \
    .select(
        col("sensor_id"),
        col("first_value_struct.timestamp").alias("first_timestamp"),
        col("first_value_struct.energy").alias("first_energy"),
        col("max_value"),
        col("min_value")
    )




query = lines_aggregated \
    .writeStream \
    .outputMode('complete') \
    .trigger(processingTime='5 minutes') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()

+---------+---------+
|sensor_id|consumed |
+---------+---------+
|H        |15.5     |
|E        |9.400024 |
|F        |6.799988 |
|D        |5.6000977|
|I        |4.5      |
|K        |4.200012 |
|A        |3.9000244|
|J        |2.0999756|
|C        |1.5999756|
|B        |1.5999756|
|G        |1.2000122|
+---------+---------+

+---------+---------+
|sensor_id|consumed |
+---------+---------+
|H        |15.600098|
|E        |9.800049 |
|F        |6.799988 |
|D        |5.699951 |
|K        |4.799988 |
|I        |4.5      |
|A        |4.0      |
|J        |2.0999756|
|C        |2.0      |
|B        |1.5999756|
|G        |1.2000122|
+---------+---------+



### 2. Compute the running total energy consumed so far, for the day. The value should be updated every 5 minutes. (Sorted in descending order by value and sensor.)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

def dumpBatchDF(df, epoch_id):

  # most_recent_date = df.select(max("date")).collect()[0][0]
  # df = df.filter(col("date") == most_recent_date)

  joined_df = df \
          .select("date", "sensor_id","min_energy", "max_energy",  \
                (col("max_energy") - col("min_energy")).alias("consumed"))

  ordered_joined_df = joined_df.orderBy(col("date"),col('consumed').desc(), col('sensor_id'))
  ordered_joined_df.show(100,truncate=False)


spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
    .getOrCreate()


lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'energy') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('energy', FloatType(), True)])

lines = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')

lines_with_date = lines.withColumn("date", to_date(col("timestamp")))

lines_aggregated = lines_with_date.withWatermark('timestamp', '1 second') \
    .groupBy( 'date', 'sensor_id') \
    .agg( \
        max("energy").alias("max_energy"),
        min("energy").alias("min_energy")
    )






query = lines_aggregated \
    .writeStream \
    .outputMode('complete') \
    .trigger(processingTime='5 minutes') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()

+----------+---------+----------+----------+---------+
|date      |sensor_id|min_energy|max_energy|consumed |
+----------+---------+----------+----------+---------+
|2024-02-01|H        |2145.4    |2168.9    |23.5     |
|2024-02-01|I        |927.2     |947.1     |19.899963|
|2024-02-01|E        |1874.0    |1890.1    |16.099976|
|2024-02-01|D        |2615.0    |2627.9    |12.899902|
|2024-02-01|F        |748.0     |759.0     |11.0     |
|2024-02-01|C        |1098.8    |1108.5    |9.699951 |
|2024-02-01|K        |841.2     |849.6     |8.399963 |
|2024-02-01|A        |650.5     |658.6     |8.099976 |
|2024-02-01|B        |627.5     |631.7     |4.200012 |
|2024-02-01|G        |833.7     |837.0     |3.2999878|
|2024-02-01|J        |966.7     |969.3     |2.5999756|
|2024-02-02|I        |947.2     |959.7     |12.5     |
|2024-02-02|E        |1890.2    |1900.3    |10.100098|
|2024-02-02|K        |849.7     |857.5     |7.799988 |
|2024-02-02|F        |759.1     |765.6     |6.5      |
|2024-02-0

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=46>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/sock

Py4JError: An error occurred while calling o74.awaitTermination

### 3) For the current day, compute the total energy used in each half hour period. The value should be updated every 5 minutes. (Sorted by period; a column for each sensor)


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from operator import attrgetter
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import to_timestamp
import pandas as pd
from pyspark.sql.window import Window

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))),
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _


def processBatchDF(df, epoch_id):

    df_pivoted = df.groupBy( "timestamp") \
        .pivot("sensor_id") \
        .agg(first("max_energy")) \
        .orderBy("timestamp", ascending=False)

    windowSpec = Window.orderBy("timestamp")

    diff_df = df_pivoted.select(
    col("timestamp"),
    *[
        (col(column) - lag(col(column), 1).over(windowSpec)).alias(column)
        for column in df_pivoted.columns
        if column != "timestamp"
    ]
)


    diff_df.show(1000, truncate=False)

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
    .getOrCreate()

lines = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'energy') \
    .option('startingOffsets', 'earliest') \
    .load() \
    .selectExpr('CAST(value AS STRING)')

schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('energy', FloatType(), True)])

lines = lines.select(from_json(col('value'), schema).alias('data')).select('data.*')


lines = lines.withColumn("timestamp",
                   when(minute("timestamp") < 30,
                        expr("make_timestamp(year(timestamp), month(timestamp), day(timestamp), hour(timestamp), 30, 0)"))
                   .when(~(minute("timestamp") < 30) & (hour("timestamp") == 23),
                         expr("make_timestamp(year(timestamp), month(timestamp), day(timestamp) + 1 , 0, 0, 0)"))
                   .otherwise(
                       expr("make_timestamp(year(timestamp), month(timestamp), day(timestamp), hour(timestamp) + 1, 0, 0)"))
                  )



results = lines \
      .groupBy("timestamp", "sensor_id") \
      .agg(max("energy").alias("max_energy"), min("energy").alias("min_energy")) \
      .withColumn("consumed", col("max_energy") - col("min_energy")) \
      .orderBy("timestamp", "sensor_id") \
      .drop("consumed", "min_energy", "energy")

schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('max_energy', FloatType(), True)])

results = results.groupBy("sensor_id").apply(resample(schema, "1800S"))


query = results \
    .writeStream \
    .outputMode('complete') \
    .trigger(processingTime='5 minutes') \
    .foreachBatch(lambda df, epoch_id: processBatchDF(df, epoch_id)) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()



+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|timestamp          |A_diff     |B_diff     |C_diff     |D_diff     |E_diff     |F_diff     |G_diff     |H_diff     |I_diff     |J_diff     |K_diff     |
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|2024-02-01 00:30:00|NULL       |NULL       |NULL       |NULL       |NULL       |NULL       |NULL       |NULL       |NULL       |NULL       |NULL       |
|2024-02-01 01:00:00|0.0        |0.099975586|0.0        |1.0        |0.5        |0.4000244  |0.0        |0.7998047  |0.0        |0.5999756  |0.7000122  |
|2024-02-01 01:30:00|0.099975586|0.2000122  |0.0        |0.39990234 |0.4000244  |0.2999878  |0.0        |0.9001465  |0.0        |0.5        |0.099975586|
|2024-02-01 02:00:00|0.0        |0.0        |0.0        |0.100097656|0.5    

In [None]:
from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
    .getOrCreate()

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))),
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _

from pyspark.sql.functions import to_timestamp

df = spark.createDataFrame([
    ("John",   "2018-02-01 03:00:03", 60),
    ("John",   "2018-02-01 03:03:05", 66),
    ("John",   "2018-02-01 03:05:10", 70),
    ("John",   "2018-02-01 03:08:14", 76),
    ("Mo",     "2017-06-04 01:05:06", 10),
    ("Mo",     "2017-06-04 01:07:07", 20),
    ("Mo",     "2017-06-04 01:10:21", 35),
    ("Mo",     "2017-06-04 01:11:23", 40),
], ("webID", "timestamp", "counts")).withColumn(
  "timestamp", to_timestamp("timestamp")
)

df.groupBy("webID").apply(resample(df.schema, "60S")).show()



+------+-------------------+-----+
|counts|          timestamp|webID|
+------+-------------------+-----+
|    60|2018-02-01 03:00:00| John|
|    62|2018-02-01 03:01:00| John|
|    64|2018-02-01 03:02:00| John|
|    66|2018-02-01 03:03:00| John|
|    68|2018-02-01 03:04:00| John|
|    70|2018-02-01 03:05:00| John|
|    72|2018-02-01 03:06:00| John|
|    74|2018-02-01 03:07:00| John|
|    76|2018-02-01 03:08:00| John|
|    10|2017-06-04 01:05:00|   Mo|
|    15|2017-06-04 01:06:00|   Mo|
|    20|2017-06-04 01:07:00|   Mo|
|    25|2017-06-04 01:08:00|   Mo|
|    30|2017-06-04 01:09:00|   Mo|
|    35|2017-06-04 01:10:00|   Mo|
|    40|2017-06-04 01:11:00|   Mo|
+------+-------------------+-----+



#### 4 Compute the running total expense for the day. The value should be updated every minute. (Sorted in descending order by value and sensor.)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from operator import attrgetter
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import to_timestamp
import pandas as pd
from pyspark.sql.window import Window

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
    .getOrCreate()

schema = StructType([
    StructField("time", StringType(), True),
    StructField("price", FloatType(), True)
])

times = [
    "0:0:0", "0:30:0",
    "1:0:0", "1:30:0",
    "2:0:0", "2:30:0",
    "3:0:0", "3:30:0",
    "4:0:0", "4:30:0",
    "5:0:0", "5:30:0",
    "6:0:0", "6:30:0",
    "7:0:0", "7:30:0",
    "8:0:0", "8:30:0",
    "9:0:0", "9:30:0",
    "10:0:0", "10:30:0",
    "11:0:0", "11:30:0",
    "12:0:0", "12:30:0",
    "13:0:0", "13:30:0",
    "14:0:0", "14:30:0",
    "15:0:0", "15:30:0",
    "16:0:0", "16:30:0",
    "17:0:0", "17:30:0",
    "18:0:0", "18:30:0",
    "19:0:0", "19:30:0",
    "20:0:0", "20:30:0",
    "21:0:0", "21:30:0",
    "22:0:0", "22:30:0",
    "23:0:0", "23:30:0"
]

values = [
    0.1072, 0.1072,
    0.1072, 0.1072,
    0.1072, 0.1072,
    0.1072, 0.1072,
    0.1072, 0.1072,
    0.1072, 0.1072,
    0.1072, 0.1072,
    0.1072, 0.1072,
    0.1741, 0.1741,
    0.2400, 0.2400,
    0.2400, 0.1741,
    0.1741, 0.1741,
    0.1741, 0.1741,
    0.1741, 0.1741,
    0.1741, 0.1741,
    0.1741, 0.1741,
    0.1741, 0.1741,
    0.1741, 0.1741,
    0.2400, 0.2400,
    0.2400, 0.2400,
    0.2400, 0.1741,
    0.1741, 0.1741,
    0.1072, 0.1072,
    0.1072, 0.1072
]
data = list(zip(times, values))

prices = spark.createDataFrame(data, schema)

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))),
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _


def processBatchDF(df, epoch_id):
    df_pivoted = df.groupBy( "timestamp") \
            .pivot("sensor_id") \
            .agg(first("max_energy")) \
            .orderBy("timestamp", ascending=False)

    windowSpec = Window.orderBy("timestamp")


    diff_df = df_pivoted.select(
    col("timestamp"),
    *[
        (col(column) - lag(col(column), 1).over(windowSpec)).alias(column)
        for column in df_pivoted.columns
        if column != "timestamp"
    ]
    )

    unpivoted_df = diff_df.select(
    "timestamp",
    expr("""
        stack(
            11,
            'A', A,
            'B', B,
            'C', C,
            'D', D,
            'E', E,
            'F', F,
            'G', G,
            'H', H,
            'I', I,
            'J', J,
            'K', K
        ) as (sensor_id, consumed)
    """)
)
    unpivoted_df = unpivoted_df.withColumn("timestamp_2",
                        expr("concat(hour(timestamp), ':', minute(timestamp), ':', second(timestamp))"))

    unpivoted_df = unpivoted_df.withColumn("timestamp_3",

                       expr("make_timestamp(year(timestamp), month(timestamp), day(timestamp), 0, 0, 0)"))


    join_df = unpivoted_df.join(prices, unpivoted_df.timestamp_2 == prices.time)

    join_df = join_df.withColumn("cost", expr("consumed * price"))

    join_df = join_df \
          .groupBy("timestamp_3", "sensor_id") \
          .agg(sum("cost").alias("sum")) \
          .orderBy("timestamp_3", "sensor_id") \

    join_df.show(1000, truncate=False)



lines = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'energy') \
    .option('startingOffsets', 'earliest') \
    .load() \
    .selectExpr('CAST(value AS STRING)')

schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('energy', FloatType(), True)])

lines = lines.select(from_json(col('value'), schema).alias('data')).select('data.*')


lines = lines.withColumn("timestamp",
                   when(minute("timestamp") < 30,
                        expr("make_timestamp(year(timestamp), month(timestamp), day(timestamp), hour(timestamp), 30, 0)"))
                   .when(~(minute("timestamp") < 30) & (hour("timestamp") == 23),
                         expr("make_timestamp(year(timestamp), month(timestamp), day(timestamp) + 1 , 0, 0, 0)"))
                   .otherwise(
                       expr("make_timestamp(year(timestamp), month(timestamp), day(timestamp), hour(timestamp) + 1, 0, 0)"))
                  )




results = lines \
      .groupBy("timestamp", "sensor_id") \
      .agg(max("energy").alias("max_energy"), min("energy").alias("min_energy")) \
      .withColumn("consumed", col("max_energy") - col("min_energy")) \
      .orderBy("timestamp", "sensor_id") \
      .drop("consumed", "min_energy", "energy")

schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('max_energy', FloatType(), True)])
#interpolacao
results = results.groupBy("sensor_id").apply(resample(schema, "1800S"))






query = results \
    .writeStream \
    .outputMode('complete') \
    .trigger(processingTime='5 minutes') \
    .foreachBatch(lambda df, epoch_id: processBatchDF(df, epoch_id)) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()



+-------------------+---------+-------------------+
|timestamp_3        |sensor_id|sum                |
+-------------------+---------+-------------------+
|2024-02-01 00:00:00|A        |-16.4403066188097  |
|2024-02-01 00:00:00|B        |-13.202925531193614|
|2024-02-01 00:00:00|C        |-26.154103722423315|
|2024-02-01 00:00:00|D        |-50.374582595191896|
|2024-02-01 00:00:00|E        |-45.889638017863035|
|2024-02-01 00:00:00|F        |-15.89534199051559 |
|2024-02-01 00:00:00|G        |-17.479860108345747|
|2024-02-01 00:00:00|H        |-48.18638587370515 |
|2024-02-01 00:00:00|I        |-34.117031319998205|
|2024-02-01 00:00:00|J        |-24.39481687080115 |
|2024-02-01 00:00:00|K        |-22.878203536383808|
|2024-02-02 00:00:00|A        |1.039206170476973  |
|2024-02-02 00:00:00|B        |1.1604456370696425 |
|2024-02-02 00:00:00|C        |0.6120753288269043 |
|2024-02-02 00:00:00|D        |1.5572994500398636 |
|2024-02-02 00:00:00|E        |2.116235690191388  |
|2024-02-02 