### Spark Setup

In [1]:
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
import os
from pyspark.sql.functions import from_json, expr
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Jupyter") \
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c2d657ff-4e7e-4bf6-87bc-9907646eb019;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.common

24/05/08 18:32:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Electricity data

In [3]:
schema_energy = StructType() \
    .add("Minutes1UTC", TimestampType()) \
    .add("SolarPower", DoubleType()) \
    .add("OnshoreWindPower", DoubleType())

In [4]:
df_kafka_raw_energy = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "energy_data") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [5]:
df_parsed_energy = df_kafka_raw_energy \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .select(from_json("value", schema_energy).alias("data")) \
    .select("data.*")

In [6]:
save_to_csv_energy = df_parsed_energy.writeStream \
    .format("csv") \
    .option("path", "tmp_energy") \
    .option("checkpointLocation", "checkpoint_energy") \
    .start()

24/05/08 18:33:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

#### Price data

In [None]:
schema_prices = StructType() \
    .add("HourUTC", TimestampType()) \
    .add("PriceArea", StringType()) \
    .add("SpotPriceEUR", DoubleType())

In [None]:
df_kafka_raw_prices = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "prices_data") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [None]:
df_parsed_prices = df_kafka_raw_prices \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .select(from_json("value", schema_prices).alias("data")) \B
    .select("data.*")

#### Analytics

In [None]:
# Calculate average solar power per hour
from pyspark.sql.functions import window, hour
average_solar_power_per_hour = df_parsed_energy \
    .groupBy(window("Minutes1UTC", "1 hour")) \
    .agg({"SolarPower": "avg"}) \
    .withColumnRenamed("avg(SolarPower)", "AverageSolarPower") \
    .withColumn("Hour", hour("window.start")) \
    .select("Hour", "AverageSolarPower")

query = average_solar_power_per_hour \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

In [None]:
save_to_csv_prices = df_parsed_prices.writeStream \
    .format("csv") \
    .option("path", "tmp_prices") \
    .option("checkpointLocation", "checkpoint_prices") \
    .start()

In [None]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

In [None]:
df_kafka_encoded.printSchema()