In [1]:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    countDistinct,
    hour,
    avg,
    when,
    lit,
    col,
    explode,
    create_map,
    broadcast
)

from pyspark.sql.types import (
    StructType as st,
    StructField as sf,
    StringType as srt,
)

Create a SparkSession with Delta Lake support:  
- Master is set to `spark://spark-master:7077`  
- Additional configs enable DeltaLake integration

In [2]:
builder = (
    SparkSession.builder
        .appName("Analysis")
        .master("spark://spark-master:7077") 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

In [3]:
sprk = configure_spark_with_delta_pip(builder).getOrCreate()
sprk.sparkContext.setLogLevel("WARN")

Path to the Delta table created by the Kafka subscriber

In [4]:
fp = "/data/delta_output"

Read the Delta table into a DataFrame

In [6]:
df = sprk.read.format("delta").load(fp)

In [7]:
df.printSchema()

root
 |-- signal_date: date (nullable = true)
 |-- signal_ts: timestamp (nullable = true)
 |-- create_date: date (nullable = true)
 |-- create_ts: timestamp (nullable = true)
 |-- signals: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [8]:
df.show(5, truncate=False)

+-----------+-------------------+-----------+-----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|signal_date|signal_ts          |create_date|create_ts              |signals                                                                                                                                                                   |
+-----------+-------------------+-----------+-----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-10-27 |2018-10-27 00:40:00|2025-03-26 |2025-03-26 04:33:55.367|{LV ActivePower (kW) -> 2329.56201171875, Wind Speed (m/s) -> 9.68965244293212, Theoretical_Power_Curve (KWh) -> 2583.53686803962, Wind Direction (°) -> 194.844802856445}|
|2018-10-27 |2018-10-27 12:20:00|202

1) Calculate number of distinct `signal_ts` per day

In [9]:
dts = (
    df.groupBy("signal_date")
      .agg(countDistinct("signal_ts").alias("distinct_ts_count"))
)

In [10]:
dts.show()

+-----------+-----------------+
|signal_date|distinct_ts_count|
+-----------+-----------------+
| 2018-08-10|               96|
| 2018-05-28|              100|
| 2018-06-06|               87|
| 2018-03-17|              144|
| 2018-09-01|               82|
| 2018-08-11|               91|
| 2018-11-02|              104|
| 2018-06-26|               96|
| 2018-08-08|               91|
| 2018-09-09|              101|
| 2018-10-05|               97|
| 2018-06-30|              100|
| 2018-11-01|              102|
| 2018-05-26|               92|
| 2018-09-08|               92|
| 2018-03-23|              144|
| 2018-08-31|              108|
| 2018-04-18|               89|
| 2018-01-23|              144|
| 2018-02-26|              144|
+-----------+-----------------+
only showing top 20 rows



2) Calculate average value of all signals per hour  
Extract columns from the `signals` map

In [11]:
lv_active_power_col = col("signals")["LV ActivePower (kW)"].cast("double")
wind_speed_col      = col("signals")["Wind Speed (m/s)"].cast("double")
theoretical_col     = col("signals")["Theoretical_Power_Curve (KWh)"].cast("double")
wind_dir_col        = col("signals")["Wind Direction (°)"].cast("double")

Group by `signal_date` and hour, then compute averages

In [12]:
grouped = (
    df.groupBy(
        "signal_date",
        hour("signal_ts").alias("hour_of_day")
    )
    .agg(
        avg(lv_active_power_col).alias("avg_active_power"),
        avg(wind_speed_col).alias("avg_wind_speed"),
        avg(theoretical_col).alias("avg_theoretical_power_curve"),
        avg(wind_dir_col).alias("avg_wind_direction")
    )
)

In [13]:
grouped.show(10, truncate=False)

+-----------+-----------+------------------+------------------+---------------------------+------------------+
|signal_date|hour_of_day|avg_active_power  |avg_wind_speed    |avg_theoretical_power_curve|avg_wind_direction|
+-----------+-----------+------------------+------------------+---------------------------+------------------+
|2018-05-04 |23         |0.0               |2.780018746852867 |11.238955820468625         |309.849052429199  |
|2018-06-27 |22         |3250.36800130208  |12.156673431396399|3539.962337927693          |233.46033223470033|
|2018-11-01 |18         |2637.257226562496 |10.86811008453366 |3212.6447386998707         |71.50665283203122 |
|2018-10-17 |18         |603.2187881469723 |6.164311408996578 |685.6376820250624          |59.78288269042962 |
|2018-07-14 |17         |1211.8605346679658|7.797745863596593 |1421.5998856059098         |29.295638402302984|
|2018-08-26 |15         |194.559397379557  |4.66117095947265  |257.0749616740943          |48.432759602864564|
|

3) Add `generation_indicator` column based on avg_active_power  
If <200 => Low, 200-600 => Medium, 600-1000 => High, >=1000 => Exceptional

In [14]:
gen_indicator = (
    grouped.withColumn(
        "generation_indicator",
        when(col("avg_active_power") < 200, "Low")
        .when((col("avg_active_power") >= 200) & (col("avg_active_power") < 600), "Medium")
        .when((col("avg_active_power") >= 600) & (col("avg_active_power") < 1000), "High")
        .otherwise("Exceptional")
    )
)

In [15]:
gen_indicator.show(10, truncate=False)

+-----------+-----------+------------------+------------------+---------------------------+------------------+--------------------+
|signal_date|hour_of_day|avg_active_power  |avg_wind_speed    |avg_theoretical_power_curve|avg_wind_direction|generation_indicator|
+-----------+-----------+------------------+------------------+---------------------------+------------------+--------------------+
|2018-05-04 |23         |0.0               |2.780018746852867 |11.238955820468625         |309.849052429199  |Low                 |
|2018-06-27 |22         |3250.36800130208  |12.156673431396399|3539.962337927693          |233.46033223470033|Exceptional         |
|2018-11-01 |18         |2637.257226562496 |10.86811008453366 |3212.6447386998707         |71.50665283203122 |Exceptional         |
|2018-10-17 |18         |603.2187881469723 |6.164311408996578 |685.6376820250624          |59.78288269042962 |High                |
|2018-07-14 |17         |1211.8605346679658|7.797745863596593 |1421.59988560

4) Create a new DataFrame with JSON-based signal mapping and do broadcast join  
This simulates a small lookup table for signal names

In [16]:
data = [
    ("LV ActivePower (kW)", "active_power_average"),
    ("Wind Speed (m/s)", "wind_speed_average"),
    ("Theoretical_Power_Curve (KWh)", "theo_power_curve_average"),
    ("Wind Direction (°)", "wind_direction_average")
]

In [17]:
schema = st([
    sf("sig_name", srt(), True),
    sf("sig_mapping_name", srt(), True)
])

In [18]:
new_df = sprk.createDataFrame(data, schema)

Transform columns into a map -> explode it -> join with new_df to rename

In [19]:
formatted_df = (
    gen_indicator
    .select(
       "signal_date",
       "hour_of_day",
       "generation_indicator",
       create_map(
         lit("LV ActivePower (kW)"), col("avg_active_power"),
         lit("Wind Speed (m/s)"), col("avg_wind_speed"),
         lit("Theoretical_Power_Curve (KWh)"), col("avg_theoretical_power_curve"),
         lit("Wind Direction (°)"), col("avg_wind_direction")
       ).alias("metrics")
    )
    .select(
       "signal_date",
       "hour_of_day",
       "generation_indicator",
       explode(col("metrics")).alias("sig_name", "value")
    )
)

Perform a broadcast join to replace `sig_name` with `sig_mapping_name`

In [20]:
joined_df = (
    formatted_df
    .join(broadcast(new_df), on="sig_name", how="left")
    .select(
       "signal_date",
       "hour_of_day",
       "generation_indicator",
       "sig_name",
       "sig_mapping_name",
       "value"
    )
)

In [21]:
joined_df.show(20, truncate=False)

+-----------+-----------+--------------------+-----------------------------+------------------------+------------------+
|signal_date|hour_of_day|generation_indicator|sig_name                     |sig_mapping_name        |value             |
+-----------+-----------+--------------------+-----------------------------+------------------------+------------------+
|2018-05-04 |23         |Low                 |LV ActivePower (kW)          |active_power_average    |0.0               |
|2018-05-04 |23         |Low                 |Wind Speed (m/s)             |wind_speed_average      |2.780018746852867 |
|2018-05-04 |23         |Low                 |Theoretical_Power_Curve (KWh)|theo_power_curve_average|11.238955820468625|
|2018-05-04 |23         |Low                 |Wind Direction (°)           |wind_direction_average  |309.849052429199  |
|2018-06-27 |22         |Exceptional         |LV ActivePower (kW)          |active_power_average    |3250.36800130208  |
|2018-06-27 |22         |Excepti

In [22]:
sprk.stop()