In [None]:
import yaml

config = yaml.load('config.yml')

# Replace with your Confluent Cloud Kafka cluster details
bootstrap_servers = [config['confluent']['bootstrap_servers']]  # e.g. 'pkc-xxxxxx.us-east-1.aws.confluent.cloud:9092'


# Configure Spark to read from Kafka
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers ,
    "subscribe": "weather-readings",
    "startingOffsets": "earliest",
    "kafka.security.protocol": "SASL_SSL",  # Confluent Cloud
    "kafka.sasl.mechanism": "PLAIN"         # Confluent Cloud
}

# Read streaming data from Kafka
weather_stream = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Parse JSON data and convert timestamp
from pyspark.sql.functions import from_json, col, window, to_timestamp, expr
from pyspark.sql.types import StructType,StructField,StringType,DoubleType

schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType()),
    StructField("wind_speed", DoubleType()),
    StructField("timestamp", StringType())
])

# Parse the JSON and convert timestamp string to timestamp type
parsed_stream = weather_stream \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", to_timestamp(col("timestamp")))

# Now create windows on the timestamp column with clean column names
windowed_stats = parsed_stream \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window("timestamp", "5 minutes").alias("time_window"),
        "sensor_id"
    ) \
    .agg(
        expr("avg(temperature)").alias("avg_temperature"),
        expr("avg(humidity)").alias("avg_humidity"),
        expr("avg(wind_speed)").alias("avg_wind_speed")
    ) \
    .select(
        "sensor_id",
        col("time_window.start").alias("window_start"),
        col("time_window.end").alias("window_end"),
        "avg_temperature",
        "avg_humidity",
        "avg_wind_speed"
    )

# Write to Delta Lake
query = windowed_stats \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint/weather_stats") \
    .table("weather_stats")