### Config

In [None]:
#connect to blob storage
storage_account_name = "your_storage"
storage_account_access_key = "your_key"
#configure
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_access_key)

In [None]:
#check connection
try:
    files = dbutils.fs.ls(f"wasbs://bronze@{storage_account_name}.blob.core.windows.net/")
    print("Connection to Bronze layer successful. Files in container:")
    for file in files:
        print(file.name)
except Exception as e:
    print("Connection failed:", str(e))

In [None]:
#check connection
try:
    files = dbutils.fs.ls(f"wasbs://silver@{storage_account_name}.blob.core.windows.net/")
    print("Connection to Silver layer successful. Files in container:")
    for file in files:
        print(file.name)
except Exception as e:
    print("Connection failed:", str(e))

### Transformations

In [None]:
# import necessary packages for creating Spark session, schema def, and transform
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, MapType, ArrayType, IntegerType
from pyspark.sql.functions import *

#### Schemas

In [None]:
# current weather
# coordinates (nested object)
coord_schema = StructType([
    StructField("lon", DoubleType(), False),
    StructField("lat", DoubleType(), False)
])

# weather (array within top-level object)
weather_schema = ArrayType(StructType([
    StructField("id", IntegerType(), False),
    StructField("main", StringType(), True),
    StructField("description", StringType(), True),
    StructField("icon", StringType(), True)
]), True)

# main (nested object)
main_schema = StructType([
    StructField("temp", DoubleType(), False),
    StructField("feels_like", DoubleType(), True),
    StructField("temp_min", DoubleType(), True),
    StructField("temp_max", DoubleType(), True),
    StructField("pressure", IntegerType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("sea_level", IntegerType(), True),
    StructField("grnd_level", IntegerType(), True)
])

# wind (nested object)
wind_schema = StructType([
    StructField("speed", DoubleType(), True),
    StructField("deg", IntegerType(), True),
    StructField("gust", DoubleType(), True)
])

# rain (nested object) with added 3h field
rain_schema = StructType([
    StructField("1h", DoubleType(), True),
    StructField("3h", DoubleType(), True)  # added 3h field
])

# snow (nested object) with 1h and 3h fields
snow_schema = StructType([
    StructField("1h", DoubleType(), True),
    StructField("3h", DoubleType(), True)
])

# clouds (nested object)
cloud_schema = StructType([
    StructField("all", IntegerType(), True)
])

# sys (nested object) with additional fields
sys_schema = StructType([
    StructField("type", IntegerType(), True),
    StructField("id", IntegerType(), False),
    StructField("message", DoubleType(), True),  # added message field
    StructField("country", StringType(), True),
    StructField("sunrise", LongType(), True),
    StructField("sunset", LongType(), True)
])

# final schema with all added fields
weather_curr_schema = StructType([
    StructField("coord", coord_schema, True),
    StructField("weather", weather_schema , True),
    StructField("base", StringType(), True),
    StructField("main", main_schema, True),
    StructField("visibility", IntegerType(), True),
    StructField("wind", wind_schema, True),
    StructField("rain", rain_schema, True),
    StructField("snow", snow_schema, True),  # added snow object
    StructField("clouds", cloud_schema, True),
    StructField("dt", LongType(), True),
    StructField("sys", sys_schema, True),
    StructField("timezone", IntegerType(), True),
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("cod", IntegerType(), True)
])


In [None]:
# define schema for 3 hr forecast api

# list.main schema (nested object within list)
list_main_schema = StructType([
    StructField("temp", DoubleType(), False),
    StructField("feels_like", DoubleType(), True),
    StructField("temp_min", DoubleType(), True),
    StructField("temp_max", DoubleType(), True),
    StructField("pressure", IntegerType(), True),
    StructField("sea_level", IntegerType(), True),
    StructField("grnd_level", IntegerType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("temp_kf", DoubleType(), True)
])

# list.weather (nested array within list containing an object)
weather_schema = ArrayType(StructType([
    StructField("id", IntegerType(), True),
    StructField("main", StringType(), True),
    StructField("description", StringType(), True),
    StructField("icon", StringType(), True)
]))

# list.clouds schema (nested object within list)
clouds_schema = StructType([
    StructField("all", IntegerType(), True)
])

# list.wind schema (nested object within list)
wind_schema = StructType([
    StructField("speed", DoubleType(), True),
    StructField("deg", IntegerType(), True),
    StructField("gust", DoubleType(), True)
])

# list.rain schema (nested object within list)
rain_schema = StructType([
    StructField("3h", DoubleType(), True)
])

# list.sys schema (nested object within list)
sys_schema = StructType([
    StructField("pod", StringType(), True)
])

# list schema within main object (array)
list_schema = ArrayType(StructType([
    StructField("dt", LongType(), True),
    StructField("main", main_schema, True),
    StructField("weather", weather_schema, True),
    StructField("clouds", clouds_schema, True),
    StructField("wind", wind_schema, True),
    StructField("visibility", IntegerType(), True),
    StructField("pop", DoubleType(), True),
    StructField("rain", rain_schema, True),
    StructField("sys", sys_schema, True),
    StructField("dt_txt", StringType(), True)
]))

# Define the schema for the "coord" field inside "city"
coord_schema = StructType([
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True)
])

# Define the schema for the "city" field
city_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("coord", coord_schema, True),
    StructField("country", StringType(), True),
    StructField("population", IntegerType(), True),
    StructField("timezone", IntegerType(), True),
    StructField("sunrise", LongType(), True),
    StructField("sunset", LongType(), True)
])

# Main schema
weather_3_hr_schema = StructType([
    StructField("cod", StringType(), True),
    StructField("message", IntegerType(), True),
    StructField("cnt", IntegerType(), True),
    StructField("list", list_schema, True),
    StructField("city", city_schema, True)
])

#### Load Datasets

In [None]:
# fetch geocode json --> parse and read into spark df (do NOT read this as a stream)
geocode_path = f"wasbs://bronze@{storage_account_name}.blob.core.windows.net/geocode/Batch-GeocodingAPI/"
geocode_df = spark.read.json(geocode_path)

In [None]:
#STATIC TEST

"""
weather_curr_path = f"wasbs://bronze@{storage_account_name}.blob.core.windows.net/weather/RT-CurrentAPI/"
files = dbutils.fs.ls(weather_curr_path)

# Print file names
for file in files:
    print(file.name)

weather_curr_df = spark.read.schema(weather_curr_schema).json(weather_curr_path)
"""

In [None]:
# fetch current weather json --> parse and read into spark df
weather_curr_path = f"wasbs://bronze@{storage_account_name}.blob.core.windows.net/weather/RT-CurrentAPI/"
weather_curr_df = spark.readStream.schema(weather_curr_schema).json(weather_curr_path)

In [None]:
# fetch 3 hr forecast json --> parse and read into spark df
#weather_3_hr_path = f"wasbs://bronze@{storage_account_name}.blob.core.windows.net/weather/RT-3HrForecastAPI/"
#weather_3_hr_df = spark.readStream.schema(weather_3_hr_schema).json(weather_3_hr_path)

#### Functions

In [None]:
#temperature
def to_celcius(column):
  return round(column - 273.15, 2)

def to_fahrenheit(column):
  return round((column * 9/5) - 273.15, 2)

In [None]:
#time
def secs_to_hrs(column):
    return round(column/3600, 2)

#### Geocode Transformations

In [None]:
# drop local_names
# convert lat and lon to 2 decimals floor div
geocode_df = geocode_df.withColumnRenamed("name", "city") \
                       .withColumn("lat", round(floor(col("lat") * 100) / 100, 2)) \
                       .withColumn("lon", round(floor(col("lon") * 100) / 100, 2)) \
                       .drop("local_names")

In [None]:
geocode_df.show()

#### Current Weather Transformations

In [None]:
# flatten coordinates for geocode join
# drop empty coordinates object as well as unneeded columns and select all other columns

weather_curr_df = weather_curr_df.select(
    col("coord.lat").alias("lat"),
    col("coord.lon").alias("lon"),
    weather_curr_df["*"]
).drop(*["coord", "name", "city", "weather.element.icon"])

In [None]:
#explode weather array and drop icons: this creates rows for the elements in the array and you can directly access elements from weather
weather_curr_df = weather_curr_df.withColumn("weather", explode("weather"))

weather_curr_df = weather_curr_df.select(
                                    "*",
                                    col("weather.id").alias("weather_condition_id"),
                                    col("weather.main").alias("weather_type"),
                                    col("weather.description").alias("weather_description")
                                ) \
                                .drop("weather")
                                
                                


In [None]:
#Kelvin --> C and F for all temperature columns
#flatten main structure and drop it

weather_curr_df = weather_curr_df.select(
                                    "*",
                                    col("main.pressure").alias("pressure"),
                                    col("main.humidity").alias("humidity"),
                                    col("main.sea_level").alias("sea_level"),
                                    col("main.grnd_level").alias("grnd_level")
                                ) \
                                .withColumn("temp_Celsius", to_celcius(col("main.temp"))) \
                                .withColumn("temp_Fahrenheit", to_fahrenheit(col("main.temp"))) \
                                .withColumn("feels_like_Celcius", to_celcius(col("main.feels_like"))) \
                                .withColumn("feels_like_Fahrenheit", to_fahrenheit(col("main.feels_like"))) \
                                .withColumn("temp_min_Celcius", to_celcius(col("main.temp_min"))) \
                                .withColumn("temp_min_Fahrenheit", to_fahrenheit(col("main.temp_min"))) \
                                .withColumn("temp_max_Celcius", to_celcius(col("main.temp_max"))) \
                                .withColumn("temp_max_Fahrenheit", to_fahrenheit(col("main.temp_max"))) \
                                .drop("main")


In [None]:
#flatten wind/rain/clouds nested structure
weather_curr_df = weather_curr_df.select(
                                    "*",
                                    col("wind.speed").alias("wind_speed"),
                                    col("wind.deg").alias("wind_direction"),
                                    col("wind.gust").alias("wind_burst"),
                                    col("rain.1h").alias("rain_1h"),
                                    col("rain.3h").alias("rain_3h"),
                                    col("snow.1h").alias("snow_1h"),
                                    col("snow.3h").alias("snow_3h"),
                                    col("clouds.all").alias("cloudiness_%"),
                                ) \
                                .drop(*["wind", "rain", "clouds", "snow"])

In [None]:
# unix timestamps (seconds since 1970) --> dt
# utc offset in unix seconds --> to hours col, UTC+hrs col (convert to string first)
weather_curr_df = weather_curr_df.withColumn("datetime", from_unixtime(col("dt"))) \
                                 .withColumn("sunrise", from_unixtime(col("sys.sunrise"))) \
                                 .withColumn("sunset", from_unixtime(col("sys.sunset"))) \
                                 .withColumn("utc_offset", secs_to_hrs(col("timezone"))) \
                                 .withColumn(
                                     "utc_offset_label",
                                     concat(lit("UTC+"), secs_to_hrs(col("timezone")).cast("string"))
                                     ) \
                                 .drop("sys")

In [None]:
#prepare for partitioning by date
weather_curr_df = weather_curr_df.withColumn("date", to_date(col("datetime")))

#### 3-Hr Forecast Transformations

#### Fast-Slow Joins

In [None]:
# enrich current weather with geocode
# geocode can be broadcast joined for query optimization: copy it to all worker nodes since its a small static dataset
# left join for enrichment
enriched_curr_weather_df = weather_curr_df.join(
    broadcast(geocode_df),
    on = ["lat", "lon"],
    how = "left"
    )


In [None]:
enriched_curr_weather_df.printSchema()

### Write to Silver


#### Static Parquet Output

In [None]:
"""

#manual
weather_curr_destination = f"wasbs://silver@{storage_account_name}.blob.core.windows.net/enriched-weather/current/"
enriched_curr_weather_df.write.mode("overwrite").partitionBy("date").parquet(weather_curr_destination)

files = dbutils.fs.ls(weather_curr_destination)
for file in files:
    print(file.name)

"""

#### Stream

In [None]:
#current weather
#checkpoints to take note of state and metadata in case of failures and restart
#partition by date, not datetime (need to extract)

weather_curr_destination = f"wasbs://silver@{storage_account_name}.blob.core.windows.net/enriched-weather/current/"
weather_curr_checkpoint = f"wasbs://silver@{storage_account_name}.blob.core.windows.net/checkpoints/enriched-weather-checkpoints/current-checkpoints/"

# Explicitly print out the paths to verify they are correct
print("Weather Current Destination Path:", weather_curr_destination)
print("Weather Current Checkpoint Path:", weather_curr_checkpoint)

write_weather_current = enriched_curr_weather_df.writeStream \
    .format("parquet") \
    .partitionBy("date") \
    .option("path", weather_curr_destination) \
    .option("checkpointLocation", weather_curr_checkpoint) \
    .trigger(processingTime="10 minutes") \
    .start()

#keep the stream active until manual exit
write_weather_current.awaitTermination()