In [0]:
from configs.adls_config import adls_config
from configs.urls import BRONZE_LAYER_PATH, SILVER_LAYER_PATH, OPENWEATHER_API_BASE_URL
from configs.logger import get_logger
import requests
import json
import pandas as pd
from datetime import datetime
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, LongType, FloatType, DoubleType
from pyspark.sql.functions import col, udf, from_json, explode, current_timestamp, from_unixtime

In [0]:
logger = get_logger("openweather_logger")

In [0]:
def get_newest_path(base_path):
    current_path = base_path
    while True:
        try:
            contents = dbutils.fs.ls(current_path)
        except Exception:
            return current_path
        if not contents or any('_delta_log' in path.path for path in contents):
            return current_path
        current_path = max(contents, key=lambda x: x.name).path

In [0]:
adls_config(spark,dbutils)
cities_base_path = f"{SILVER_LAYER_PATH}city_data_cleaned/"
latest_date_path = get_newest_path(cities_base_path)
try:    
    df_cities = spark.read.format("delta").load(latest_date_path)
except Exception as e:
    logger.error(f"Error loading Delta table: {e}")

logger.info("City data loaded from Silver Layer.")

In [0]:
api_key = dbutils.secrets.get(scope="openweathermap", key="api-key")

@udf(returnType=StringType())
def get_weather(latitude, longitude):
    url = f"{OPENWEATHER_API_BASE_URL}?lat={latitude}&lon={longitude}&appid={api_key}"
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return json.dumps(response.json())
        else:
            logger.error(f"API Error for lat={latitude}, lon={longitude}: {response.status_code} - {response.text}")
            return None
    except Exception as e:
        logger.error(f"Error: {e}")
        return None
    
df_with_json = df_cities.withColumn("weather_json_raw", get_weather(col("latitude"), col("longitude")))

weather_schema = ArrayType(
    StructType([
        StructField("id", LongType(), True),
        StructField("main", StringType(), True),
        StructField("description", StringType(), True),
        StructField("icon", StringType(), True),
    ])
)

hourly_schema = ArrayType(
    StructType([
        StructField("dt", LongType(), True),
        StructField("temp", DoubleType(), True),
        StructField("feels_like", DoubleType(), True),
        StructField("pressure", LongType(), True),
        StructField("humidity", LongType(), True),
        StructField("wind_speed", DoubleType(), True),
        StructField("wind_deg", LongType(), True),
        StructField("clouds", LongType(), True),
        StructField("rain", StructType([StructField("1h", FloatType(), True)]), True),
        StructField("weather", weather_schema, True),
    ])
)
df_raw_weather = (
    df_with_json
    .withColumn("weather_records", from_json(col("weather_json_raw"), StructType([
        StructField("hourly", hourly_schema),
        StructField("timezone", LongType(), True),
        StructField("timezone_offset", LongType(), True),
    ])))
    .withColumn("record", explode(col("weather_records.hourly")))
    .withColumn("dt_iso",from_unixtime(col("record.dt"), "yyyy-MM-dd'T'HH:mm:ss"))
    .select(
        col("country"),
        col("state"),
        col("latitude"),
        col("longitude"),
        col("weather_records.timezone"),
        col("weather_records.timezone_offset"),
        col("dt_iso"),
        col("record.temp"),
        col("record.feels_like"),
        col("record.pressure"),
        col("record.humidity"),
        col("record.wind_speed"),
        col("record.wind_deg"),
        col("record.clouds"),
        col("record.weather").getItem(0).getField("main").alias("weather_main"),
        col("record.weather").getItem(0).getField("description").alias("weather_description"),
        col("record.rain.1h").alias("rain"),
        current_timestamp().alias("ingestion_timestamp") # Thêm ingestion_timestamp
    )
)
logger.info("Raw weather data extracted from API.")

In [0]:
# --- Save raw data to Bronze Layer ---
current_date_path = datetime.now().strftime("%Y/%m/%d")
bronze_output_path = f"{BRONZE_LAYER_PATH}openweathermap_hourly_raw/dt={current_date_path}/"

df_raw_weather.write.format("delta").mode("append").save(bronze_output_path)

logger.info("Raw weather data saved to Delta Lake in Bronze Layer.")
logger.info(f"Data saved to: {bronze_output_path}")