In [None]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Ingestion - AQI Realtime (30 min)
# MAGIC Pulls near-real-time AQI snapshots for Indian metro cities at 30-minute intervals.


In [None]:
# COMMAND ----------
import json
import time
from datetime import datetime, timezone

import requests
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType


In [None]:
# COMMAND ----------
# aqi_api_url example: https://api.waqi.info/feed
# endpoint pattern expected: {aqi_api_url}/{city}/?token={aqi_api_token}

dbutils.widgets.text("aqi_api_url", "https://api.waqi.info/feed")
dbutils.widgets.text("aqi_api_token", "")
dbutils.widgets.text("cities_csv", "delhi,mumbai,bengaluru,kolkata,chennai,hyderabad,pune,ahmedabad")
dbutils.widgets.text("bronze_catalog", "main")
dbutils.widgets.text("bronze_schema", "wattrac_bronze")
dbutils.widgets.text("aqi_table", "aqi_realtime_raw")

AQI_API_URL = dbutils.widgets.get("aqi_api_url").rstrip("/")
AQI_API_TOKEN = dbutils.widgets.get("aqi_api_token")
CITIES = [c.strip() for c in dbutils.widgets.get("cities_csv").split(",") if c.strip()]
BRONZE_CATALOG = dbutils.widgets.get("bronze_catalog")
BRONZE_SCHEMA = dbutils.widgets.get("bronze_schema")
AQI_TABLE = dbutils.widgets.get("aqi_table")
TARGET_TABLE = f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{AQI_TABLE}"


In [None]:
# COMMAND ----------
def get_city_aqi(city: str):
    url = f"{AQI_API_URL}/{city}/"
    params = {"token": AQI_API_TOKEN}
    for attempt in range(3):
        resp = requests.get(url, params=params, timeout=45)
        if resp.ok:
            return resp.json()
        time.sleep((attempt + 1) * 5)
    resp.raise_for_status()


In [None]:
# COMMAND ----------
schema = StructType([
    StructField("city", StringType(), False),
    StructField("aqi", DoubleType(), True),
    StructField("dominant_pollutant", StringType(), True),
    StructField("station_name", StringType(), True),
    StructField("station_lat", DoubleType(), True),
    StructField("station_lon", DoubleType(), True),
    StructField("source_observation_ts", TimestampType(), True),
    StructField("ingestion_ts", TimestampType(), False),
    StructField("ingestion_date", StringType(), False),
    StructField("raw_payload", StringType(), False),
])

run_ts = datetime.now(timezone.utc)
rows = []
for city in CITIES:
    payload = get_city_aqi(city)
    data = payload.get("data", {})
    city_info = data.get("city", {})
    geo = city_info.get("geo", [None, None])
    rows.append({
        "city": city,
        "aqi": float(data.get("aqi")) if data.get("aqi") not in (None, "-") else None,
        "dominant_pollutant": data.get("dominentpol"),
        "station_name": city_info.get("name"),
        "station_lat": float(geo[0]) if geo and geo[0] is not None else None,
        "station_lon": float(geo[1]) if len(geo) > 1 and geo[1] is not None else None,
        "source_observation_ts": run_ts,
        "ingestion_ts": run_ts,
        "ingestion_date": run_ts.strftime("%Y-%m-%d"),
        "raw_payload": json.dumps(payload),
    })

bronze_df = spark.createDataFrame(rows, schema=schema)


In [None]:
# COMMAND ----------
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {BRONZE_CATALOG}.{BRONZE_SCHEMA}")
(bronze_df.write
 .format("delta")
 .mode("append")
 .partitionBy("ingestion_date")
 .saveAsTable(TARGET_TABLE))

display(bronze_df)
print(f"Inserted {bronze_df.count()} rows into {TARGET_TABLE}")
