In [1]:
import requests, time, json

API_KEY = "aa75d28bce28a7bddbee3d3301478bb4460ac10940b4814d6455a96b19199084"
BASE = "https://api.openaq.org"
headers = {"X-API-Key": API_KEY}

loc_params = {
    "coordinates": "40.7128,-74.0060",
    "radius": 25000,
    "limit": 100,
    "page": 1
}

r = requests.get(f"{BASE}/v3/locations", params=loc_params, headers=headers, timeout=60)
r.raise_for_status()
locations = r.json().get("results", [])

pm25_sensors = []
for loc in locations:
    for s in loc.get("sensors", []):
        if s.get("parameter", {}).get("name") == "pm25":
            pm25_sensors.append({
                "location_id": loc["id"],
                "location_name": loc.get("name"),
                "latitude": loc.get("coordinates", {}).get("latitude"),
                "longitude": loc.get("coordinates", {}).get("longitude"),
                "sensor_id": s["id"]
            })

print("Locations found:", len(locations))
print("PM2.5 sensors found:", len(pm25_sensors))

TARGET_SENSORS = 30
selected = pm25_sensors[:TARGET_SENSORS]
print("Selected sensors:", len(selected))


StatementMeta(, 2f44767f-3c44-4a8a-8591-a68ec92b971c, 3, Finished, Available, Finished)

Locations found: 57
PM2.5 sensors found: 51
Selected sensors: 30


In [2]:
date_from = "2024-01-01"
date_to   = "2024-12-31"

all_rows = []
failed = []

for i, item in enumerate(selected, start=1):
    sensor_id = item["sensor_id"]
    params_days = {"date_from": date_from, "date_to": date_to, "limit": 1000, "page": 1}

    resp = requests.get(f"{BASE}/v3/sensors/{sensor_id}/days", params=params_days, headers=headers, timeout=60)

    if resp.status_code != 200:
        failed.append({"sensor_id": sensor_id, "status": resp.status_code})
        time.sleep(1.0)
        continue

    rows = resp.json().get("results", [])
    for row in rows:
        row["sensor_id"] = sensor_id
        row["location_id"] = item["location_id"]
        row["location_name"] = item["location_name"]
        row["latitude"] = item["latitude"]
        row["longitude"] = item["longitude"]

    all_rows.extend(rows)

    if i % 5 == 0:
        print(f"Processed {i}/{TARGET_SENSORS} sensors; total rows: {len(all_rows)}")

    time.sleep(0.4)

print("Final rows:", len(all_rows))
print("Failed count:", len(failed))


StatementMeta(, 2f44767f-3c44-4a8a-8591-a68ec92b971c, 4, Finished, Available, Finished)

Processed 5/30 sensors; total rows: 1816
Processed 10/30 sensors; total rows: 2909
Processed 15/30 sensors; total rows: 3977
Processed 20/30 sensors; total rows: 5071
Processed 25/30 sensors; total rows: 6271
Processed 30/30 sensors; total rows: 7230
Final rows: 7230
Failed count: 0


In [4]:
json_lines = [json.dumps(r) for r in all_rows]
rdd = spark.sparkContext.parallelize(json_lines)

df_bronze_full = spark.read.json(rdd)

(
    df_bronze_full.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("bronze_openaq")
)

StatementMeta(, 2f44767f-3c44-4a8a-8591-a68ec92b971c, 6, Finished, Available, Finished)

root
 |-- coordinates: string (nullable = true)
 |-- coverage: struct (nullable = true)
 |    |-- datetimeFrom: struct (nullable = true)
 |    |    |-- local: string (nullable = true)
 |    |    |-- utc: string (nullable = true)
 |    |-- datetimeTo: struct (nullable = true)
 |    |    |-- local: string (nullable = true)
 |    |    |-- utc: string (nullable = true)
 |    |-- expectedCount: long (nullable = true)
 |    |-- expectedInterval: string (nullable = true)
 |    |-- observedCount: long (nullable = true)
 |    |-- observedInterval: string (nullable = true)
 |    |-- percentComplete: double (nullable = true)
 |    |-- percentCoverage: double (nullable = true)
 |-- flagInfo: struct (nullable = true)
 |    |-- hasFlags: boolean (nullable = true)
 |-- latitude: double (nullable = true)
 |-- location_id: long (nullable = true)
 |-- location_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- parameter: struct (nullable = true)
 |    |-- displayName: string (n