In [15]:
import requests
import pandas as pd
import time

# --- CONFIG ---
API_KEY = "9d8ad33c3bfde1431f1748da6878dc8c082b017fab31638caa50190fac77ce8a"
JERSEY_CITY_LAT, JERSEY_CITY_LON =13.0843,80.2705
TARGET_PARAMETERS = ['co', 'no2', 'so2', 'o3', 'pm25', 'pm10']
START_DATE = "2022-01-01"
END_DATE = "2022-12-31"

In [16]:
def get_jersey_city_locations(radius=25000):
    base_url = "https://api.openaq.org/v3/locations"
    params = {
        "coordinates": f"{JERSEY_CITY_LAT},{JERSEY_CITY_LON}",
        "radius": radius,
        "limit": 500
    }
    headers = {"X-API-Key": API_KEY}

    try:
        response = requests.get(base_url, params=params, headers=headers)
        if response.status_code != 200:
            print(f"‚ùå Failed: {response.status_code}")
            return pd.DataFrame()

        data = response.json().get('results', [])
        if not data:
            print("No locations found.")
            return pd.DataFrame()

        df = pd.json_normalize(data)
        df.rename(columns={
            'id': 'location_id',
            'name': 'location_name',
            'coordinates.latitude': 'latitude',
            'coordinates.longitude': 'longitude',
        }, inplace=True)

        print(f" Found {len(df)} monitoring stations.")
        return df

    except Exception as e:
        print(f" Error: {e}")
        return pd.DataFrame()


jersey_city_locations = get_jersey_city_locations()
display(jersey_city_locations.head())


 Found 11 monitoring stations.


Unnamed: 0,location_id,location_name,locality,timezone,isMobile,isMonitor,instruments,sensors,licenses,bounds,...,owner.id,owner.name,provider.id,provider.name,latitude,longitude,datetimeFirst.utc,datetimeFirst.local,datetimeLast.utc,datetimeLast.local
0,378,Alandur Bus Depot,,Asia/Kolkata,False,True,"[{'id': 2, 'name': 'Government Monitor'}]","[{'id': 661, 'name': 'co ¬µg/m¬≥', 'parameter': ...",,"[80.19151667, 12.99711389, 80.19151667, 12.997...",...,4,Unknown Governmental Organization,168,CPCB,12.997114,80.191517,2016-03-22T00:45:00Z,2016-03-22T06:15:00+05:30,2018-02-22T03:45:00Z,2018-02-22T09:15:00+05:30
1,2461,US Diplomatic Post: Chennai,,Asia/Kolkata,False,True,"[{'id': 2, 'name': 'Government Monitor'}]","[{'id': 4725, 'name': 'pm25 ¬µg/m¬≥', 'parameter...",,"[80.251932, 13.052371, 80.251932, 13.052371]",...,4,Unknown Governmental Organization,245,StateAir Chennai,13.052371,80.251932,2016-01-30T00:30:00Z,2016-01-30T06:00:00+05:30,2016-11-09T16:30:00Z,2016-11-09T22:00:00+05:30
2,2549,IIT,,Asia/Kolkata,False,True,"[{'id': 2, 'name': 'Government Monitor'}]","[{'id': 5141, 'name': 'co ¬µg/m¬≥', 'parameter':...",,"[80.23744722, 12.992513890000001, 80.23744722,...",...,4,Unknown Governmental Organization,168,CPCB,12.992514,80.237447,2016-03-22T00:45:00Z,2016-03-22T06:15:00+05:30,2018-02-22T03:45:00Z,2018-02-22T09:15:00+05:30
3,2586,"Manali, Chennai - CPCB",,Asia/Kolkata,False,True,"[{'id': 2, 'name': 'Government Monitor'}, {'id...","[{'id': 5339, 'name': 'co ¬µg/m¬≥', 'parameter':...",,"[80.26285, 13.164544, 80.26285, 13.164544]",...,8517,Central Pollution Control Board,168,CPCB,13.164544,80.26285,2016-03-21T10:00:00Z,2016-03-21T15:30:00+05:30,2025-11-07T10:00:00Z,2025-11-07T15:30:00+05:30
4,5655,"Velachery Res. Area, Chennai - CPCB",,Asia/Kolkata,False,True,"[{'id': 2, 'name': 'Government Monitor'}, {'id...","[{'id': 12235526, 'name': 'co ppb', 'parameter...",,"[80.2398125, 13.0052189, 80.2398125, 13.0052189]",...,8517,Central Pollution Control Board,168,CPCB,13.005219,80.239812,2018-03-09T05:30:00Z,2018-03-09T11:00:00+05:30,2025-11-07T10:15:00Z,2025-11-07T15:45:00+05:30


In [17]:
def get_sensor_data(sensor_id, start_date, end_date):
    print(f" Fetching data for sensor {sensor_id} from {start_date} to {end_date}...")

    base_url = f"https://api.openaq.org/v3/sensors/{sensor_id}/measurements"
    headers = {"X-API-Key": API_KEY}
    params = {
        "date_from": start_date,
        "date_to": end_date,
        "limit": 1000
    }

    all_results = []
    page = 1

    try:
        while True:
            params["page"] = page
            r = requests.get(base_url, headers=headers, params=params)
            if r.status_code != 200:
                print(f" Error {r.status_code}: {r.text}")
                break

            data = r.json()
            results = data.get("results", [])
            if not results:
                break

            all_results.extend(results)
            if page >= data.get("meta", {}).get("pages", 1):
                break

            page += 1
            time.sleep(0.5)

    except Exception as e:
        print(f" Exception: {e}")

    if not all_results:
        print(f" No data found for sensor {sensor_id}")
        return pd.DataFrame()

    # Flatten relevant fields manually
    extracted = []
    for rec in all_results:
        extracted.append({
            "datetimeUtc": rec.get("period", {}).get("datetimeFrom", {}).get("utc"),
            "datetimeLocal": rec.get("period", {}).get("datetimeFrom", {}).get("local"),
            "parameter": rec.get("parameter", {}).get("name"),
            "value": rec.get("value"),
            "unit": rec.get("parameter", {}).get("units")
        })

    df = pd.DataFrame(extracted)

    # Convert datetime fields safely
    if "datetimeUtc" in df.columns:
        df["datetimeUtc"] = pd.to_datetime(df["datetimeUtc"], errors="coerce")
    if "datetimeLocal" in df.columns:
        df["datetimeLocal"] = pd.to_datetime(df["datetimeLocal"], errors="coerce")

    # Drop empty rows
    df = df.dropna(subset=["value"], how="all")

    print(f" Retrieved {len(df)} records for sensor {sensor_id}")
    return df[["datetimeUtc", "datetimeLocal", "parameter", "value", "unit"]]

In [12]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"


In [18]:


from pyspark.sql import SparkSession
import pandas as pd
import time


spark = SparkSession.builder.appName("OpenAQ_Distributed_Fetch").master("local[4]").getOrCreate()

def collect_targeted_sensor_data_spark(locations_df, start_date, end_date, target_params):


    target_sensors = []
    for _, loc in locations_df.iterrows():
        sensors = loc.get('sensors', [])
        if not sensors:
            continue
        for s in sensors:
            param = s.get('parameter', {}).get('name', '')
            if param in target_params:
                target_sensors.append({
                    'id': s['id'],
                    'location_id': loc.get('location_id'),
                    'location_name': loc.get('location_name'),
                    'parameter': param
                })
    print(f" Found {len(target_sensors)} sensors for target pollutants.")


    rdd = spark.sparkContext.parallelize(target_sensors, numSlices=8)

    def fetch_sensor(sensor):
        """Runs on worker nodes ‚Äî fetches data for one sensor."""
        try:
            from time import sleep
            import pandas as pd
            import requests

            base_url = f"https://api.openaq.org/v3/sensors/{sensor['id']}/measurements"
            headers = {"X-API-Key": API_KEY}
            params = {"date_from": start_date, "date_to": end_date, "limit": 1000}

            all_results = []
            page = 1
            while True:
                params["page"] = page
                r = requests.get(base_url, headers=headers, params=params)
                if r.status_code != 200:
                    break
                data = r.json()
                results = data.get("results", [])
                if not results:
                    break
                all_results.extend(results)
                if page >= data.get("meta", {}).get("pages", 1):
                    break
                page += 1
                sleep(0.3)

            if not all_results:
                return []

            extracted = []
            for rec in all_results:
                extracted.append({
                    "datetimeUtc": rec.get("period", {}).get("datetimeFrom", {}).get("utc"),
                    "datetimeLocal": rec.get("period", {}).get("datetimeFrom", {}).get("local"),
                    "parameter": sensor["parameter"],
                    "value": rec.get("value"),
                    "unit": rec.get("parameter", {}).get("units"),
                    "location_id": sensor["location_id"],
                    "location_name": sensor["location_name"]
                })
            return extracted

        except Exception as e:
            print(f" Error fetching {sensor['id']}: {e}")
            return []

    results = rdd.flatMap(fetch_sensor).collect()


    if results:
        df = pd.DataFrame(results)
        print(f" Retrieved total {len(df)} records from {len(target_sensors)} sensors.")
    else:
        print(" No data retrieved.")
        df = pd.DataFrame()

    return df

sensor_data_df = collect_targeted_sensor_data_spark(
    jersey_city_locations,
    START_DATE,
    END_DATE,
    TARGET_PARAMETERS
)


sensor_data_dict = {"combined": sensor_data_df}


 Found 86 sensors for target pollutants.
 Retrieved total 75103 records from 86 sensors.


In [19]:


from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, udf, to_timestamp, year, month, dayofmonth, hour, dayofweek, when, lit
)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType
import json

spark = SparkSession.builder.getOrCreate()

if 'sensor_data_dict' in locals() and sensor_data_dict:
    print("\nMerging all sensor data into one Spark DataFrame...")
    all_data = None
    for df in sensor_data_dict.values():
        sdf = spark.createDataFrame(df)
        all_data = sdf if all_data is None else all_data.unionByName(sdf, allowMissingColumns=True)
else:
    print(" No sensor data found to clean.")
    all_data = None

if all_data:
    print("\nCleaning and organizing data for analysis...")
    print(f"Initial record count: {all_data.count()}")

    extract_utc_udf = udf(lambda x: json.loads(x).get("utc") if x and isinstance(x, str) and "utc" in x else None, StringType())
    if "period_datetimeFrom" in all_data.columns:
        all_data = all_data.withColumn("extracted_utc", extract_utc_udf(col("period_datetimeFrom")))
        all_data = all_data.withColumn("datetimeUtc", to_timestamp(col("extracted_utc")))
        print(" Extracted datetimeUtc from JSON column.")
    else:
        all_data = all_data.withColumn("datetimeUtc", lit(None).cast("timestamp"))
        print(" period_datetimeFrom column not found!")

    all_data = (
        all_data
        .withColumn("year", year(col("datetimeUtc")))
        .withColumn("month", month(col("datetimeUtc")))
        .withColumn("day", dayofmonth(col("datetimeUtc")))
        .withColumn("hour", hour(col("datetimeUtc")))
        .withColumn("dayofweek", dayofweek(col("datetimeUtc")))
    )

    extract_lat_udf = udf(
        lambda x: (json.loads(x).get("latitude") if isinstance(x, str) and "latitude" in x else None),
        DoubleType()
    )
    extract_lon_udf = udf(
        lambda x: (json.loads(x).get("longitude") if isinstance(x, str) and "longitude" in x else None),
        DoubleType()
    )

    if "coordinates" in all_data.columns:
        all_data = all_data.withColumn("latitude", extract_lat_udf(col("coordinates")))
        all_data = all_data.withColumn("longitude", extract_lon_udf(col("coordinates")))
        print("üìç Extracted latitude and longitude from coordinates.")
    else:
        all_data = all_data.withColumn("latitude", lit(None).cast("double"))
        all_data = all_data.withColumn("longitude", lit(None).cast("double"))
        print(" No coordinate column found ‚Äî created empty lat/lon.")

    # --- Convert value to numeric ---
    all_data = all_data.withColumn("value", col("value").cast("double"))

    unit_map = {
        'pm25': 'Œºg/m¬≥', 'pm10': 'Œºg/m¬≥',
        'no2': 'ppm', 'so2': 'ppm', 'co': 'ppm', 'o3': 'ppm'
    }
    mapping_expr = when(col("unit").isNull(),
                        when(col("parameter") == "pm25", lit("Œºg/m¬≥"))
                        .when(col("parameter") == "pm10", lit("Œºg/m¬≥"))
                        .when(col("parameter") == "no2", lit("ppm"))
                        .when(col("parameter") == "so2", lit("ppm"))
                        .when(col("parameter") == "co", lit("ppm"))
                        .when(col("parameter") == "o3", lit("ppm"))
                        .otherwise(col("unit"))).otherwise(col("unit"))
    all_data = all_data.withColumn("unit", mapping_expr)

    print("\nDetecting outliers...")
    from pyspark.sql import Window
    from pyspark.sql.functions import percentile_approx

    window_param = Window.partitionBy("parameter")
    q1 = percentile_approx(col("value"), 0.25).over(window_param)
    q3 = percentile_approx(col("value"), 0.75).over(window_param)
    iqr = q3 - q1

    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    all_data = all_data.withColumn(
        "is_outlier",
        when((col("value") < lower_bound) | (col("value") > upper_bound), lit(True)).otherwise(lit(False))
    )

    # --- Final Columns ---
    final_cols = [
        "parameter", "value", "unit", "datetimeUtc",
        "location_id", "location_name", "latitude", "longitude",
        "year", "month", "day", "hour", "dayofweek", "is_outlier"
    ]
    available_cols = [c for c in final_cols if c in all_data.columns]
    all_data = all_data.select(*available_cols)

    print("\n Data cleaning complete.")
    print(f"Final record count: {all_data.count()}")

    # Convert back to pandas for visualization later
    all_data_pd = all_data.toPandas()
    display(all_data_pd.head())

else:
    print("\n No data available for cleaning and preprocessing.")



Merging all sensor data into one Spark DataFrame...

Cleaning and organizing data for analysis...
Initial record count: 75103
 period_datetimeFrom column not found!
 No coordinate column found ‚Äî created empty lat/lon.

Detecting outliers...

 Data cleaning complete.
Final record count: 75103


Unnamed: 0,parameter,value,unit,datetimeUtc,location_id,location_name,latitude,longitude,year,month,day,hour,dayofweek,is_outlier
0,co,4250.0,¬µg/m¬≥,NaT,378,Alandur Bus Depot,,,,,,,,True
1,co,32360.0,¬µg/m¬≥,NaT,378,Alandur Bus Depot,,,,,,,,True
2,co,5100.0,¬µg/m¬≥,NaT,378,Alandur Bus Depot,,,,,,,,True
3,co,38970.0,¬µg/m¬≥,NaT,378,Alandur Bus Depot,,,,,,,,True
4,co,36330.0,¬µg/m¬≥,NaT,378,Alandur Bus Depot,,,,,,,,True
