In [15]:
import os
import logging
from pathlib import Path
from dotenv import load_dotenv
import hashlib
import requests
import pandas as pd
import json
import time
import pyarrow as pa
from datetime import datetime, timezone
from deltalake import write_deltalake
from tqdm import tqdm

In [16]:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logging.info("Initialising configuration...")

2025-11-21 12:50:09,001 - root - INFO - Initialising configuration...


In [17]:
load_dotenv()

True

In [18]:
project_root = Path.cwd()
input_csv_path = project_root / Path("..") / "resources" / "vietnam_provinces_geocoded.csv"
input_csv_path.resolve()

PosixPath('/home/tan/geo-weather-lake/resources/vietnam_provinces_geocoded.csv')

In [19]:
MINIO_ENDPOINT = "http://localhost:9000"
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
WEATHERBIT_API_KEY = os.getenv("WEATHERBIT_API_KEY")

storage_options = {
    "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
    "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
    "AWS_ENDPOINT_URL": MINIO_ENDPOINT,
    "AWS_ALLOW_HTTP": "true",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true"
}

BRONZE_BUCKET = 'weather-bronze'
BRONZE_TABLE_PATH = f's3://{BRONZE_BUCKET}/weather_events'

logging.info("Configuration loaded for historical ingestion.")

2025-11-21 12:50:09,022 - root - INFO - Configuration loaded for historical ingestion.


In [21]:
logging.info(f"Reading geocoded locations from {input_csv_path}...")
df_locations = pd.read_csv(input_csv_path)
locations_to_fetch = df_locations.to_dict('records')
logging.info(f"Successfully loaded {len(locations_to_fetch)} locations to fetch historical data for.")

2025-11-21 12:50:09,034 - root - INFO - Reading geocoded locations from /home/tan/geo-weather-lake/notebooks/../resources/vietnam_provinces_geocoded.csv...
2025-11-21 12:50:09,037 - root - INFO - Successfully loaded 34 locations to fetch historical data for.


In [22]:
locations_to_fetch

[{'name': 'Hải Phòng', 'latitude': 20.8623278, 'longitude': 106.6799266},
 {'name': 'Hồ Chí Minh', 'latitude': 10.7755254, 'longitude': 106.7021047},
 {'name': 'Cà Mau', 'latitude': 9.0180177, 'longitude': 105.0869724},
 {'name': 'Gia Lai', 'latitude': 14.0201373, 'longitude': 108.6354524},
 {'name': 'Đồng Tháp', 'latitude': 10.425183, 'longitude': 105.9271362},
 {'name': 'Cần Thơ', 'latitude': 10.0362046, 'longitude': 105.7872656},
 {'name': 'Lâm Đồng', 'latitude': 11.6614957, 'longitude': 108.1335279},
 {'name': 'An Giang', 'latitude': 10.3188672, 'longitude': 105.0432488},
 {'name': 'Quảng Ngãi', 'latitude': 14.9953739, 'longitude': 108.691729},
 {'name': 'Quảng Trị', 'latitude': 17.2166964, 'longitude': 106.9548246},
 {'name': 'Hưng Yên', 'latitude': 20.6065846, 'longitude': 106.2843471},
 {'name': 'Cao Bằng', 'latitude': 22.7426936, 'longitude': 106.1060926},
 {'name': 'Lào Cai', 'latitude': 22.3069302, 'longitude': 104.1829592},
 {'name': 'Ninh BÌnh', 'latitude': 20.2421142, 'lon

In [23]:
def fetch_historical_data(api_key: str, locations: list, start_date: str, end_date: str):
    """
    Fetches hourly historical weather data for a list of locations and enriches it with province names.
    """
    if not api_key:
        logging.error("Weatherbit API Key is missing.")
        raise ValueError("API Key not provided.")
    
    BASE_URL = 'http://api.weatherbit.io/v2.0/history/hourly'
    all_historical_observations = []

    logging.info(f"Fetching historical data from {start_date} to {end_date} for {len(locations)} locations.")

    for location in tqdm(locations, desc="Fetching data for locations"):
        params = {
            'key': api_key,
            'lat': location['latitude'],
            'lon': location['longitude'],
            'start_date': start_date,
            'end_date': end_date
        }

        try:
            response = requests.get(BASE_URL, params=params, timeout=20)

            if response.status_code == 200:
                try:
                    data = response.json()
                except json.JSONDecodeError:
                    logging.warning(f"Received non-JSON response for '{location['name']}'. Skipping.")
                    continue

                data = response.json()
                metadata = {key: value for key, value in data.items() if key != 'data'}
                observations = data.get('data', [])

                for obs in observations:
                    metadata_obs = metadata.copy()
                    metadata_obs.update(obs)
                    metadata_obs['place_name'] = location['name']

                    all_historical_observations.append(metadata_obs)
            else:
                logging.error(
                    f"Failed to fetch data for '{location['name']}'. "
                    f"Status Code: {response.status_code}, Response: {response.text}"
                )
            
            time.sleep(1.1)

        except requests.exceptions.RequestException as e:
            logging.error(f"A request error occurred for '{location['name']}': {e}")

    logging.info(f"Total historical records fetched: {len(all_historical_observations)}")

    return all_historical_observations

In [24]:
print("Running a test run:")
start_date_str = "2025-10-30"
end_date_str   = "2025-10-31"

test_res = fetch_historical_data(WEATHERBIT_API_KEY, locations_to_fetch[:1], start_date_str, end_date_str)

2025-11-21 12:50:09,061 - root - INFO - Fetching historical data from 2025-10-30 to 2025-10-31 for 1 locations.


Running a test run:


Fetching data for locations: 100%|██████████| 1/1 [00:01<00:00,  1.74s/it]
2025-11-21 12:50:10,808 - root - INFO - Total historical records fetched: 24


In [25]:
print(json.dumps(test_res, indent=2, ensure_ascii=False))

[
  {
    "city_id": "1581298",
    "city_name": "Haiphong",
    "country_code": "VN",
    "lat": 20.8623278,
    "lon": 106.6799266,
    "sources": [
      "488251-99999",
      "imerg",
      "era5",
      "sat",
      "radar"
    ],
    "state_code": "13",
    "station_id": "488251-99999",
    "timezone": "Asia/Ho_Chi_Minh",
    "app_temp": 23.8,
    "azimuth": 110.9,
    "clouds": 100,
    "datetime": "2025-10-30:00",
    "dewpt": 22,
    "dhi": 64,
    "dni": 574,
    "elev_angle": 13.7,
    "ghi": 193,
    "h_angle": null,
    "pod": "d",
    "precip": 0,
    "pres": 1015,
    "revision_status": "final",
    "revision_version": "2.1",
    "rh": 94,
    "slp": 1016,
    "snow": 0,
    "solar_rad": 70,
    "temp": 23,
    "timestamp_local": "2025-10-30T07:00:00",
    "timestamp_utc": "2025-10-30T00:00:00",
    "ts": 1761782400,
    "uv": 0.7,
    "vis": 16,
    "weather": {
      "icon": "c04d",
      "code": 804,
      "description": "Overcast clouds"
    },
    "wind_dir": 10,
  

In [26]:
def make_observation_id(source: str, lat: float, lon: float, ts: int) -> str:
    """
    This functions creates a unique observation ID.
    - source: the data source (e.g. weatherbit)
    - lat, lon: coordinates of the observation
    - ts: unix timestamp of the observation
    """
    if lat is None or lon is None or ts is None:
        return None
    
    key = f"{source}|{lat:.6}|{lon:.6}|{ts}"
    return hashlib.sha256(key.encode()).hexdigest()

In [27]:
def transform_to_bronze_df_historical(observations: list) -> pd.DataFrame:
    """
    Transform raw historical JSON data into a DataFrame matching the Bronze schema.
    This version also handles schema differences between the historical data and the current data.
    """
    if not observations:
        logging.info("No new observations to transform.")
        return pd.DataFrame()
    
    logging.info("Transforming raw data to Bronze schema...")
    transformed_records = []

    ingested_at = datetime.now(timezone.utc)
    ingest_date = ingested_at.strftime('%Y-%m-%d')

    for obs_data in observations:
        if 'wind_gust_spd' in obs_data:
            obs_data['gust'] = obs_data.pop('wind_gust_spd')

        if 'app_temp' not in obs_data:
            obs_data['app_temp'] = None
            
        if 'aqi' not in obs_data:
            obs_data['aqi'] = None

        lat = obs_data.get('lat')
        lon = obs_data.get('lon')
        ts = obs_data.get('ts')

        record = {
            "observation_id": make_observation_id("weatherbit_historical", lat, lon, ts),
            "source": "weatherbit_historical",
            "raw_payload": json.dumps(obs_data, ensure_ascii=False),
            "lat": lat,
            "lon": lon,
            "_ingested_at": ingested_at,
            "ingest_date": ingest_date,
        }

        transformed_records.append(record)

    df = pd.DataFrame(transformed_records)

    df['_ingested_at'] = pd.to_datetime(df['_ingested_at'])
    df['ingest_date'] = pd.to_datetime(df['ingest_date']).dt.date
    
    return df

In [28]:
print("Tesing the transformation function...")
bronze_df_test = transform_to_bronze_df_historical(test_res)

print(f"Transformed into a DataFrame with {len(bronze_df_test)} rows.")
bronze_df_test.info()

print("Sample of the first transformed record's raw_payload:")
sample_payload = json.loads(bronze_df_test.iloc[0]['raw_payload'])
print(json.dumps(sample_payload, indent=2, ensure_ascii=False))

2025-11-21 12:50:10,838 - root - INFO - Transforming raw data to Bronze schema...


Tesing the transformation function...
Transformed into a DataFrame with 24 rows.
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24 entries, 0 to 23
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype              
---  ------          --------------  -----              
 0   observation_id  24 non-null     object             
 1   source          24 non-null     object             
 2   raw_payload     24 non-null     object             
 3   lat             24 non-null     float64            
 4   lon             24 non-null     float64            
 5   _ingested_at    24 non-null     datetime64[ns, UTC]
 6   ingest_date     24 non-null     object             
dtypes: datetime64[ns, UTC](1), float64(2), object(4)
memory usage: 1.4+ KB
Sample of the first transformed record's raw_payload:
{
  "city_id": "1581298",
  "city_name": "Haiphong",
  "country_code": "VN",
  "lat": 20.8623278,
  "lon": 106.6799266,
  "sources": [
    "488251-99999",
    "imerg",
    "e

In [29]:
def write_to_bronze(df: pd.DataFrame):
    """
    This function writes a DatFrame to a Delta table on MinIO, partitioned by ingest_date.
    """
    if df.empty:
        logging.info("DataFrame is empty. Skipping write to MinIO.")
        return

    logging.info(f"Writing {len(df)} records to Delta table: {BRONZE_TABLE_PATH}")

    arrow_table = pa.Table.from_pandas(df)

    write_deltalake(
        table_or_uri=BRONZE_TABLE_PATH,
        data=arrow_table,
        mode="append",
        partition_by=['ingest_date'],
        schema_mode="merge",
        storage_options=storage_options,
    )

    logging.info("Write operation completed successfully.")

In [30]:
run_full_pipeline = True
start_date_str = "2025-09-01"
end_date_str   = "2025-09-30"

if run_full_pipeline and locations_to_fetch:
    logging.info("Stating full pipeline...")

    raw_observations = fetch_historical_data(WEATHERBIT_API_KEY, locations_to_fetch, start_date_str, end_date_str)

    if raw_observations:
        bronze_df = transform_to_bronze_df_historical(raw_observations)
        write_to_bronze(bronze_df)
    
        logging.info("The historical ingestion pipeline completed successfully!")
else:
    print("The pipeline run was skipped. Set 'run_full_pipeline = True' to execute.")

2025-11-21 12:50:10,862 - root - INFO - Stating full pipeline...
2025-11-21 12:50:10,863 - root - INFO - Fetching historical data from 2025-09-01 to 2025-09-30 for 34 locations.
Fetching data for locations: 100%|██████████| 34/34 [01:19<00:00,  2.35s/it]
2025-11-21 12:51:30,765 - root - INFO - Total historical records fetched: 23664
2025-11-21 12:51:30,767 - root - INFO - Transforming raw data to Bronze schema...
2025-11-21 12:51:31,056 - root - INFO - Writing 23664 records to Delta table: s3://weather-bronze/weather_events
2025-11-21 12:51:31,283 - root - INFO - Write operation completed successfully.
2025-11-21 12:51:31,283 - root - INFO - The historical ingestion pipeline completed successfully!
