In [0]:
# Databricks notebook source
import requests
import pandas as pd
from datetime import datetime

# API configuration
CITY = "Hanoi"
URL = "https://api.open-meteo.com/v1/forecast?latitude=21.0285&longitude=105.8542&current_weather=true"

# Fetch data from API
try:
    response = requests.get(URL)
    response.raise_for_status()  # Raise exception for HTTP errors
    data = response.json()
    
    # Extract current weather data
    current_weather = data.get("current_weather", {})
    
    # Create DataFrame with the new API response structure
    weather_data = {
        "city": [CITY],
        "timestamp": [datetime.now().isoformat()],
        "api_timestamp": [current_weather.get("time")],
        "temperature": [current_weather.get("temperature")],
        "windspeed": [current_weather.get("windspeed")],
        "winddirection": [current_weather.get("winddirection")],
        "is_day": [current_weather.get("is_day")],
        "weathercode": [current_weather.get("weathercode")],
        "latitude": [data.get("latitude")],
        "longitude": [data.get("longitude")],
        "elevation": [data.get("elevation")]
    }

    df = pd.DataFrame(weather_data)
    
    # Save as temporary Delta table
    spark_df = spark.createDataFrame(df)
    spark_df.write.mode("overwrite").saveAsTable("temp_weather_raw")
    
    dbutils.notebook.exit("Successfully fetched weather data for " + CITY)

except requests.exceptions.RequestException as e:
    dbutils.notebook.exit(f"API request failed: {str(e)}")
except Exception as e:
    dbutils.notebook.exit(f"An error occurred: {str(e)}")

In [0]:
# Databricks notebook source
from pyspark.sql.functions import col, from_unixtime, unix_timestamp, lit
from pyspark.sql.functions import when

# Read raw data
raw_df = spark.table("temp_weather_raw")

# Transform data
transformed_df = raw_df.withColumn(
    "processed_timestamp", 
    from_unixtime(unix_timestamp(col("timestamp")))
).withColumn(
    "weather_condition",
    # Map WMO weather codes to human-readable descriptions
    when(col("weathercode") == 0, "Clear sky")
     .when(col("weathercode").isin([1, 2, 3]), "Partly cloudy")
     .when(col("weathercode").isin([45, 48]), "Fog")
     .when(col("weathercode").isin([51, 53, 55]), "Drizzle")
     .when(col("weathercode").isin([56, 57]), "Freezing drizzle")
     .when(col("weathercode").isin([61, 63, 65]), "Rain")
     .when(col("weathercode").isin([66, 67]), "Freezing rain")
     .when(col("weathercode").isin([71, 73, 75]), "Snow fall")
     .when(col("weathercode").isin([77]), "Snow grains")
     .when(col("weathercode").isin([80, 81, 82]), "Rain showers")
     .when(col("weathercode").isin([85, 86]), "Snow showers")
     .when(col("weathercode") == 95, "Thunderstorm")
     .when(col("weathercode").isin([96, 99]), "Thunderstorm with hail")
     .otherwise("Unknown")
).select(
    "city",
    "processed_timestamp",
    "temperature",
    "windspeed",
    "winddirection",
    "is_day",
    "weather_condition",
    "weathercode",
    "latitude",
    "longitude"
)

transformed_df.display()

# Create temp view for next task
transformed_df.write.mode("overwrite").saveAsTable("temp_weather_transformed")

# COMMAND ----------
dbutils.notebook.exit("Data transformation completed successfully")

In [0]:
# Databricks notebook source
from datetime import datetime

# Read transformed data
transformed_df = spark.table("temp_weather_transformed")

# Define output path with timestamp
output_path = f"/mnt/weather_data/hanoi/{datetime.now().strftime('%Y/%m/%d/%H%M')}"

# Save as Parquet
transformed_df.write.parquet(output_path, mode="overwrite")

# COMMAND ----------
dbutils.notebook.exit(f"Weather data saved to DBFS at {output_path}")