In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, explode, arrays_zip, date_format, lit
from datetime import datetime
import os

StatementMeta(, 5e40a33a-8ce7-4f03-a56b-098266a8bce7, 3, Finished, Available, Finished)

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, arrays_zip, to_timestamp, date_format, lit
from datetime import datetime
import os

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Weather Data Processing") \
    .enableHiveSupport() \
    .getOrCreate()

# Load all JSON files using the same logic as SchemaAdjustment
json_folder_path = "Files/Json"
df = spark.read.option("multiline", "true").json(f"{json_folder_path}/*.json")
print("Initial data loaded")


StatementMeta(, 5e40a33a-8ce7-4f03-a56b-098266a8bce7, 13, Finished, Available, Finished)

Initial data loaded


In [12]:
df

StatementMeta(, 5e40a33a-8ce7-4f03-a56b-098266a8bce7, 14, Finished, Available, Finished)

DataFrame[elevation: double, generationtime_ms: double, hourly: struct<rain:array<double>,relative_humidity_2m:array<bigint>,soil_temperature_6cm:array<double>,temperature_2m:array<double>,time:array<string>,wind_speed_10m:array<double>>, hourly_units: struct<rain:string,relative_humidity_2m:string,soil_temperature_6cm:string,temperature_2m:string,time:string,wind_speed_10m:string>, latitude: double, longitude: double, timezone: string, timezone_abbreviation: string, utc_offset_seconds: bigint]

In [13]:
# Explode the hourly data into separate rows
hourly_df = df.select(
    col("latitude"),
    col("longitude"),
    col("elevation"),
    col("timezone"),
    col("hourly.time").alias("time_array"),
    col("hourly.temperature_2m").alias("temperature_array"),
    col("hourly.relative_humidity_2m").alias("humidity_array"),
    col("hourly.rain").alias("rain_array"),
    col("hourly.wind_speed_10m").alias("wind_speed_array"),
    col("hourly.soil_temperature_6cm").alias("soil_temp_array")
)


StatementMeta(, 5e40a33a-8ce7-4f03-a56b-098266a8bce7, 15, Finished, Available, Finished)

In [14]:

exploded_df = hourly_df.withColumn(
    "hourly_data",
    explode(arrays_zip(
        "time_array", 
        "temperature_array", 
        "humidity_array", 
        "rain_array", 
        "wind_speed_array", 
        "soil_temp_array"
    ))
).select(
    "latitude", "longitude", "elevation", "timezone",
    col("hourly_data.time_array").alias("time"),
    col("hourly_data.temperature_array").alias("temperature_2m"),
    col("hourly_data.humidity_array").alias("relative_humidity_2m"),
    col("hourly_data.rain_array").alias("rain"),
    col("hourly_data.wind_speed_array").alias("wind_speed_10m"),
    col("hourly_data.soil_temp_array").alias("soil_temperature_6cm")
)

StatementMeta(, 5e40a33a-8ce7-4f03-a56b-098266a8bce7, 16, Finished, Available, Finished)

In [15]:
# Add timestamp and partitioning columns
processed_df = exploded_df.withColumn(
    "timestamp", 
    to_timestamp(col("time"), "yyyy-MM-dd'T'HH:mm")
).withColumn(
    "date", 
    date_format(col("timestamp"), "yyyy-MM-dd")
).withColumn(
    "processed_at",
    lit(datetime.now().isoformat())
)


StatementMeta(, 5e40a33a-8ce7-4f03-a56b-098266a8bce7, 17, Finished, Available, Finished)

In [16]:
# Write to Delta table
output_table = "weather_data"

if spark.catalog.tableExists(output_table):
    processed_df.createOrReplaceTempView("new_weather_data")
    spark.sql(f"""
        MERGE INTO {output_table} target
        USING new_weather_data source
        ON target.time = source.time
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
    print(f"Updated Delta table {output_table} with new records")
else:
    processed_df.write.format("delta").saveAsTable(output_table)
    print(f"Created new Delta table {output_table}")

print("Data processing completed successfully")


StatementMeta(, 5e40a33a-8ce7-4f03-a56b-098266a8bce7, 18, Finished, Available, Finished)

Created new Delta table weather_data
Data processing completed successfully
