In [0]:
import requests
import json
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
weather_api_url = "https://api.open-meteo.com/v1/forecast"

locations = [
    {"city":"Roma", "latitude": 41.9028, "longitude": 12.4964},
    {"city":"Milano", "latitude": 45.4643, "longitude": 9.1895},
    {"city":"Firenze", "latitude": 43.7696, "longitude": 11.2558},
    {"city":"Torino", "latitude": 45.0703, "longitude": 7.6869}
]

In [0]:
def fetch_weather_data(web_url: str, lat: float, lon: float) -> dict:
    website_url = web_url
    params = {
        "latitude": lat,
        "longitude": lon,
        "current_weather": True,
        "timezone": "UTC"
    }

    response = requests.get(website_url, params)
    response.raise_for_status()
    return response.json()

In [0]:
raw_data = []

for loc in locations:
    
    raw_weather_data = fetch_weather_data(weather_api_url, loc["latitude"], loc["longitude"])

    raw_data.append(
        {
            "city": loc["city"],
            "ingestion_ts": datetime.now(),
            "raw_weather_data": json.dumps(raw_weather_data),
            "source": weather_api_url
        }
    )

In [0]:
raw_weather_df = spark.createDataFrame(raw_data)
raw_weather_df.printSchema()

In [0]:
raw_weather_df.display()

In [0]:
(
    raw_weather_df.write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/weather/bronze/data_files/weather/")
)

In [0]:
%sql

CREATE TABLE IF NOT EXISTS weather.bronze.bronze_weather
AS
SELECT *
FROM DELTA.`/Volumes/weather/bronze/data_files/weather/`;

SELECT *
FROM weather.bronze.bronze_weather;