In [0]:
# Imports
import requests
import json
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
     

In [0]:


# Load UK airport geocode data
uk_airports_df = spark.table("dataexpert.kanak_uk_airport_geocode")
uk_airports = uk_airports_df.select(
    col("iata").alias("airport_code"),
    col("airport").alias("airport_name"),
    col("latitude_decimal_degrees").alias("latitude"),
    col("longitude_decimal_degrees").alias("longitude")
).collect()

# OpenWeatherMap API key
api_key = "your api key"

# Schema definition
schema = StructType([
    StructField("datetime", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("feels_like", DoubleType(), True),
    StructField("temp_min", DoubleType(), True),
    StructField("temp_max", DoubleType(), True),
    StructField("pressure", IntegerType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("weather_main", StringType(), True),
    StructField("weather_description", StringType(), True),
    StructField("wind_speed", DoubleType(), True),
    StructField("wind_deg", DoubleType(), True),
    StructField("wind_gust", DoubleType(), True),
    StructField("clouds_all", IntegerType(), True),
    StructField("rain_3h", DoubleType(), True),
    StructField("pop", DoubleType(), True),
    StructField("airport_code", StringType(), True),
])

# Collect forecasts from all airports
all_airport_forecasts = []

for row in uk_airports:
    lat = row["latitude"]
    lon = row["longitude"]
    airport_code = row["airport_code"]

    url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&units=metric&appid={api_key}"
    response = requests.get(url)

    if response.status_code != 200:
        print(f"❌ Failed for {airport_code} - Status code: {response.status_code}")
        continue

    forecast_list = response.json().get("list", [])

    for entry in forecast_list:
        main = entry.get("main", {})
        weather = entry.get("weather", [{}])[0]
        wind = entry.get("wind", {})
        rain = entry.get("rain", {}).get("3h", 0.0)
        clouds = entry.get("clouds", {}).get("all", None)

        all_airport_forecasts.append({
            "datetime": entry.get("dt_txt"),
            "temperature": float(main.get("temp", 0.0)),
            "feels_like": float(main.get("feels_like", 0.0)),
            "temp_min": float(main.get("temp_min", 0.0)),
            "temp_max": float(main.get("temp_max", 0.0)),
            "pressure": int(main.get("pressure", 0)),
            "humidity": int(main.get("humidity", 0)),
            "weather_main": weather.get("main", ""),
            "weather_description": weather.get("description", ""),
            "wind_speed": float(wind.get("speed", 0.0)),
            "wind_deg": float(wind.get("deg", 0.0)),
            "wind_gust": float(wind.get("gust", 0.0)),
            "clouds_all": int(clouds if clouds is not None else 0),
            "rain_3h": float(rain),
            "pop": float(entry.get("pop", 0.0)),
            "airport_code": airport_code
        })

# Create Spark DataFrame once with all data
forecast_df = spark.createDataFrame(all_airport_forecasts, schema=schema)
forecast_df = forecast_df.withColumn("datetime", col("datetime").cast(TimestampType()))

forecast_df.write.mode("overwrite").saveAsTable("dataexpert.uk_airports_5day_forecast")
