# 🥉 Bronze Layer — Raw Ingestion Notebook
## Notebook: 01_bronze_ingestion

**What this notebook does:**
- Calls the OpenAQ v3 API for 20 global cities
- Collects raw air quality measurements (PM2.5, PM10, NO2, O3, CO, SO2)
- Lands raw data into the Bronze Delta table with zero transformations
- Logs ingestion results per city

**Run after:** `00_setup`  
**Run before:** `02_silver_transformation`

In [0]:
# ============================================================
# PROJECT CONFIGURATION — Updated for OpenAQ v3
# ============================================================

DATABASE_NAME = "air_quality_db"

BRONZE_TABLE     = f"{DATABASE_NAME}.bronze_raw_measurements"
SILVER_TABLE     = f"{DATABASE_NAME}.silver_clean_measurements"
GOLD_TABLE_CITY  = f"{DATABASE_NAME}.gold_city_rankings"
GOLD_TABLE_TREND = f"{DATABASE_NAME}.gold_pollutant_trends"
GOLD_TABLE_AQI   = f"{DATABASE_NAME}.gold_aqi_summary"

API_BASE_URL   = "https://api.openaq.org/v3"
OPENAQ_API_KEY = "6702146761c7bbc4554416a63c3856c80bd69d9faae0770cfe573214c0d6ea69"

# Cities with coordinates for accurate filtering
# Format: (city_label, latitude, longitude, radius_meters)
TARGET_CITIES = [
    ("Delhi",       28.6139,   77.2090,  25000),
    ("Mumbai",      19.0760,   72.8777,  25000),
    ("Beijing",     39.9042,  116.4074,  25000),
    ("Shanghai",    31.2304,  121.4737,  25000),
    ("Lahore",      31.5204,   74.3587,  25000),
    ("Dhaka",       23.8103,   90.4125,  25000),
    ("Karachi",     24.8607,   67.0011,  25000),
    ("Lima",       -12.0464,  -77.0428,  25000),
    ("Jakarta",     -6.2088,  106.8456,  25000),
    ("Bangkok",     13.7563,  100.5018,  25000),
    ("London",      51.5074,   -0.1278,  25000),
    ("Paris",       48.8566,    2.3522,  25000),
    ("New York",    40.7128,  -74.0060,  25000),
    ("Los Angeles", 34.0522, -118.2437,  25000),
    ("Tokyo",       35.6762,  139.6503,  25000),
    ("Seoul",       37.5665,  127.0050,  25000),
    ("Mexico City", 19.4326,  -99.1332,  25000),
    ("Cairo",       30.0444,   31.2357,  25000),
    ("Lagos",        6.5244,    3.3792,  25000),
    ("Nairobi",     -1.2921,   36.8219,  25000),
]

TARGET_POLLUTANTS = ["pm25", "pm10", "no2", "o3", "co", "so2"]

AQI_CATEGORIES = {
    "Good":                  (0.0,   12.0),
    "Moderate":              (12.1,  35.4),
    "Unhealthy (Sensitive)": (35.5,  55.4),
    "Unhealthy":             (55.5,  150.4),
    "Very Unhealthy":        (150.5, 250.4),
    "Hazardous":             (250.5, 9999.0)
}

spark.sql(f"USE {DATABASE_NAME}")

print("✅ Configuration reloaded.")
print(f"   Targeting {len(TARGET_CITIES)} cities via coordinates")

✅ Configuration reloaded.
   Targeting 20 cities via coordinates


In [0]:
# ============================================================
# API HELPER FUNCTIONS — With retry on rate limit
# ============================================================

import requests
import time

API_HEADERS = {
    "Accept"   : "application/json",
    "X-API-Key": OPENAQ_API_KEY
}

def api_get_with_retry(url, params, retries=3, backoff=60):
    """
    Wraps API calls with automatic retry on 429 Rate Limit.
    Waits `backoff` seconds before retrying.
    """
    for attempt in range(1, retries + 1):
        try:
            response = requests.get(url, headers=API_HEADERS, params=params, timeout=15)

            if response.status_code == 200:
                return response

            elif response.status_code == 429:
                print(f"   ⏳ Rate limit hit. Waiting {backoff}s before retry (attempt {attempt}/{retries})...")
                time.sleep(backoff)

            else:
                print(f"   ⚠️  API error {response.status_code}")
                return response

        except Exception as e:
            print(f"   ❌ Exception: {str(e)}")
            return None

    print(f"   ❌ All {retries} retries exhausted.")
    return None


def get_locations_by_coordinates(city_name, lat, lon, radius=25000, limit=3):
    """Find monitoring stations using coordinates + radius."""
    url    = f"{API_BASE_URL}/locations"
    params = {
        "coordinates": f"{lat},{lon}",
        "radius"     : radius,
        "limit"      : limit
    }
    response = api_get_with_retry(url, params)
    if response and response.status_code == 200:
        return response.json().get("results", [])
    return []


def get_measurements_by_sensor(sensor_id, limit=50):
    """Fetch measurements using sensor ID."""
    url    = f"{API_BASE_URL}/sensors/{sensor_id}/measurements"
    params = {"limit": limit}
    response = api_get_with_retry(url, params)
    if response and response.status_code == 200:
        return response.json().get("results", [])
    return []

print("✅ API helpers with retry logic defined.")

✅ API helpers with retry logic defined.


In [0]:
# ============================================================
# RECORD BUILDER — Updated for OpenAQ v3 response structure
# ============================================================

import json
from datetime import datetime, timezone

def build_bronze_record(city, measurement, location, sensor):
    """
    Maps OpenAQ v3 measurement + location + sensor
    into our Bronze table schema.
    """

    # Extract timestamp — v3 uses period.datetimeTo.utc
    try:
        measured_at_str = (
            measurement.get("period", {}).get("datetimeTo", {}).get("utc") or
            measurement.get("date", {}).get("utc") or
            None
        )
        measured_at = (
            datetime.fromisoformat(measured_at_str.replace("Z", "+00:00"))
            if measured_at_str else None
        )
    except Exception:
        measured_at = None

    # Extract coordinates
    coords    = location.get("coordinates") or {}
    latitude  = coords.get("latitude")
    longitude = coords.get("longitude")

    # Extract country code safely
    country_data = location.get("country")
    if isinstance(country_data, dict):
        country = country_data.get("code") or country_data.get("name") or ""
    elif isinstance(country_data, str):
        country = country_data
    else:
        country = ""

    # Extract pollutant name from sensor parameter
    pollutant = (
        sensor.get("parameter", {}).get("name") or
        sensor.get("name") or
        ""
    )

    # Extract unit from sensor parameter
    unit = (
        sensor.get("parameter", {}).get("units") or
        ""
    )

    # Extract value — v3 wraps value in a dict
    raw_value = measurement.get("value")
    if isinstance(raw_value, dict):
        value = float(raw_value.get("avg") or raw_value.get("value") or 0.0)
    else:
        value = float(raw_value) if raw_value is not None else 0.0

    return {
        "city"         : city,
        "country"      : country,
        "pollutant"    : str(pollutant).lower().strip(),
        "value"        : value,
        "unit"         : str(unit),
        "location_name": str(location.get("name") or ""),
        "latitude"     : float(latitude)  if latitude  is not None else None,
        "longitude"    : float(longitude) if longitude is not None else None,
        "measured_at"  : measured_at,
        "ingested_at"  : datetime.now(timezone.utc),
        "source_url"   : f"{API_BASE_URL}/sensors/{sensor.get('id', '')}/measurements",
        "raw_json"     : json.dumps(measurement)
    }

print("✅ Updated record builder defined.")

✅ Updated record builder defined.


In [0]:
# ============================================================
# MAIN INGESTION LOOP — With rate limit safe delays
# ============================================================

from pyspark.sql.types import (
    StructType, StructField, StringType,
    DoubleType, TimestampType
)

BRONZE_SCHEMA = StructType([
    StructField("city",          StringType(),   True),
    StructField("country",       StringType(),   True),
    StructField("pollutant",     StringType(),   True),
    StructField("value",         DoubleType(),   True),
    StructField("unit",          StringType(),   True),
    StructField("location_name", StringType(),   True),
    StructField("latitude",      DoubleType(),   True),
    StructField("longitude",     DoubleType(),   True),
    StructField("measured_at",   TimestampType(), True),
    StructField("ingested_at",   TimestampType(), True),
    StructField("source_url",    StringType(),   True),
    StructField("raw_json",      StringType(),   True)
])

all_records   = []
ingestion_log = []
total_cities  = len(TARGET_CITIES)

# Delay settings
SENSOR_DELAY = 1    # seconds between each sensor call
CITY_DELAY   = 5    # seconds between each city

print(f"🚀 Starting Bronze ingestion for {total_cities} cities...")
print(f"   Sensor delay : {SENSOR_DELAY}s")
print(f"   City delay   : {CITY_DELAY}s")
print(f"   Estimated time: ~{(total_cities * CITY_DELAY) // 60 + 2} minutes\n")

for idx, (city, lat, lon, radius) in enumerate(TARGET_CITIES, start=1):
    print(f"[{idx:02d}/{total_cities}] Processing: {city}")
    city_record_count = 0

    # Step 1 — Find stations by coordinates
    locations = get_locations_by_coordinates(city, lat, lon, radius, limit=3)

    if not locations:
        print(f"   ⚠️  No stations found near {city}. Skipping.\n")
        ingestion_log.append({"city": city, "status": "NO_LOCATIONS", "records": 0})
        time.sleep(CITY_DELAY)
        continue

    print(f"   📍 Found {len(locations)} station(s)")

    # Step 2 — Loop stations → sensors → measurements
    for location in locations:
        sensors = location.get("sensors", [])

        for sensor in sensors:
            sensor_id = sensor.get("id")
            if not sensor_id:
                continue

            pollutant_name = sensor.get("parameter", {}).get("name", "").lower()
            if pollutant_name not in TARGET_POLLUTANTS:
                continue

            measurements = get_measurements_by_sensor(sensor_id, limit=50)

            for measurement in measurements:
                record = build_bronze_record(city, measurement, location, sensor)
                all_records.append(record)
                city_record_count += 1

            time.sleep(SENSOR_DELAY)  # pause between sensors

    print(f"   ✅ {city_record_count} records collected\n")
    ingestion_log.append({
        "city"   : city,
        "status" : "SUCCESS" if city_record_count > 0 else "NO_DATA",
        "records": city_record_count
    })

    time.sleep(CITY_DELAY)  # pause between cities

print(f"✅ Ingestion loop complete.")
print(f"   Total raw records collected: {len(all_records)}")

🚀 Starting Bronze ingestion for 20 cities...
   Sensor delay : 1s
   City delay   : 5s
   Estimated time: ~3 minutes

[01/20] Processing: Delhi
   📍 Found 3 station(s)
   ✅ 100 records collected

[02/20] Processing: Mumbai
   📍 Found 3 station(s)
   ✅ 1100 records collected

[03/20] Processing: Beijing
   📍 Found 3 station(s)
   ✅ 350 records collected

[04/20] Processing: Shanghai
   📍 Found 3 station(s)
   ✅ 400 records collected

[05/20] Processing: Lahore
   📍 Found 3 station(s)
   ✅ 200 records collected

[06/20] Processing: Dhaka
   📍 Found 3 station(s)
   ✅ 150 records collected

[07/20] Processing: Karachi
   📍 Found 3 station(s)
   ✅ 250 records collected

[08/20] Processing: Lima
   📍 Found 3 station(s)
   ✅ 1500 records collected

[09/20] Processing: Jakarta
   📍 Found 3 station(s)
   ✅ 150 records collected

[10/20] Processing: Bangkok
   📍 Found 3 station(s)
   ✅ 250 records collected

[11/20] Processing: London
   📍 Found 3 station(s)
   ✅ 450 records collected

[12/20] P

In [0]:
# ============================================================
# WRITE TO BRONZE DELTA TABLE
# Convert collected records to Spark DataFrame and append
# ============================================================

if len(all_records) == 0:
    print("❌ No records to write. Check API responses above.")
else:
    # Convert Python list of dicts → Spark DataFrame with explicit schema
    bronze_df = spark.createDataFrame(all_records, schema=BRONZE_SCHEMA)

    # Append to Bronze managed Delta table
    (
        bronze_df
        .write
        .format("delta")
        .mode("append")
        .saveAsTable(BRONZE_TABLE)
    )

    # Confirm what was written
    total_in_table = spark.sql(f"SELECT COUNT(*) as cnt FROM {BRONZE_TABLE}").collect()[0]["cnt"]

    print(f"✅ Successfully wrote {len(all_records)} new records to Bronze.")
    print(f"   Total records now in Bronze table: {total_in_table}")

✅ Successfully wrote 7215 new records to Bronze.
   Total records now in Bronze table: 7215


In [0]:
# ============================================================
# INGESTION SUMMARY REPORT
# ============================================================

success  = [x for x in ingestion_log if x["status"] == "SUCCESS"]
no_data  = [x for x in ingestion_log if x["status"] == "NO_DATA"]
no_locs  = [x for x in ingestion_log if x["status"] == "NO_LOCATIONS"]

print("=" * 55)
print("  🥉 BRONZE INGESTION — SUMMARY REPORT")
print("=" * 55)
print(f"  ✅ Successful cities : {len(success)}")
print(f"  ⚠️  No data cities   : {len(no_data)}")
print(f"  ❌ No location cities: {len(no_locs)}")
print(f"  📦 Total records     : {len(all_records)}")
print("=" * 55)

if no_locs:
    print(f"\n  Cities with no locations found:")
    for entry in no_locs:
        print(f"   - {entry['city']}")

if no_data:
    print(f"\n  Cities found but returned no measurements:")
    for entry in no_data:
        print(f"   - {entry['city']}")

print(f"\n▶️  Next Step: Open and run  02_silver_transformation")

  🥉 BRONZE INGESTION — SUMMARY REPORT
  ✅ Successful cities : 20
  ⚠️  No data cities   : 0
  ❌ No location cities: 0
  📦 Total records     : 7215

▶️  Next Step: Open and run  02_silver_transformation


In [0]:
# ============================================================
# PREVIEW BRONZE DATA
# Sanity check — see what the raw data looks like
# ============================================================

print("📊 Sample records from Bronze table:\n")

spark.sql(f"""
    SELECT
        city,
        country,
        pollutant,
        value,
        unit,
        location_name,
        measured_at,
        ingested_at
    FROM {BRONZE_TABLE}
    WHERE city != 'TestCity'
    LIMIT 10
""").show(truncate=False)

print("\n📊 Record count by city:\n")

spark.sql(f"""
    SELECT
        city,
        COUNT(*) as record_count,
        COUNT(DISTINCT pollutant) as pollutants_found
    FROM {BRONZE_TABLE}
    WHERE city != 'TestCity'
    GROUP BY city
    ORDER BY record_count DESC
""").show(30, truncate=False)

📊 Sample records from Bronze table:

+--------+-------+---------+-----+-----+-------------+-------------------+--------------------------+
|city    |country|pollutant|value|unit |location_name|measured_at        |ingested_at               |
+--------+-------+---------+-----+-----+-------------+-------------------+--------------------------+
|Shanghai|CN     |pm10     |40.0 |µg/m³|普陀         |2018-10-17 19:00:00|2026-02-19 19:15:02.232659|
|Shanghai|CN     |pm10     |31.0 |µg/m³|普陀         |2018-10-17 21:00:00|2026-02-19 19:15:02.23268 |
|Shanghai|CN     |pm10     |36.0 |µg/m³|普陀         |2018-10-17 22:00:00|2026-02-19 19:15:02.232699|
|Shanghai|CN     |pm10     |51.0 |µg/m³|普陀         |2018-10-17 23:00:00|2026-02-19 19:15:02.232719|
|Shanghai|CN     |pm10     |47.0 |µg/m³|普陀         |2018-10-18 00:00:00|2026-02-19 19:15:02.232738|
|Shanghai|CN     |pm10     |26.0 |µg/m³|普陀         |2018-10-18 01:00:00|2026-02-19 19:15:02.232758|
|Shanghai|CN     |pm10     |30.0 |µg/m³|普陀         |2018-