In [0]:
from pyspark.sql.functions import window, avg
from pyspark.sql.types import *

In [0]:
source_path = "/Volumes/streaming_demo/weather_stream/weather_stream_volume/source/live_weather"

schema_location = "/Volumes/streaming_demo/weather_stream/weather_stream_volume/source/schemas/_live_weather_schema"

# Define schema explicitly because CSV files store timestamps as strings.
# Without specifying TimestampType, Spark would read them as strings,
# causing issues with time-based operations in streaming.
schema = StructType([
    StructField("event_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("city", StringType()),
    StructField("temperature_c", DoubleType()),
    StructField("humidity_percent", IntegerType()),
    StructField("wind_speed_kmh", DoubleType())
])

df = spark.readStream.\
    format("cloudFiles").\
    option("cloudFiles.format", "csv").\
    option("cloudFiles.schemaLocation", schema_location).\
    schema(schema).\
    load(source_path)

In [0]:
df.withWatermark("timestamp", "30 seconds").\
    groupBy(
        window("timestamp", "60 seconds"),
        "city").\
    agg(
        avg("temperature_c"),
        avg("humidity_percent"),
        avg("wind_speed_kmh")
        ).display()