In [2]:
import os
import csv
import requests
from datetime import datetime, timedelta
from dateutil import parser
from calendar import monthrange

import pandas as pd
import numpy as np
from tqdm import tqdm
from openaq import OpenAQ

In [3]:
from constants import FEDERAL_CITIES
from constants import FEDERAL_LOCATION_IDS

FEDERAL_CITIES = list(FEDERAL_LOCATION_IDS.keys())

API_KEY = "be9dde17e764a66a5352413ec62ab925e5b89a58d8fdc4711fc5ad460cdbd29c"

Parse pollutant data from 2005 to 2023

In [22]:
# Load station-city mapping (preferred stations per city-month)
df_station_map = pd.read_csv('../../data/raw/federal/metadata/station_cities.csv')
df_station_map = df_station_map.rename(columns={'Unnamed: 0': 'month'})
df_station_map['month'] = pd.to_datetime(df_station_map['month'])
df_station_map = df_station_map.set_index('month')

federal_cities = df_station_map.columns.tolist()

# Create full hourly date range from 2005-01-01 to 2025-12-31 23:00
date_range = pd.date_range(start="2005-01-01", end="2025-12-31 23:00:00", freq="h")
columns = ['pm25', 'no2', 'o3', 'pm25_3h_avg', 'no2_3h_avg', 'o3_3h_avg',
           'aqhi_raw', 'aqhi_plus_raw', 'aqhi', 'aqhi_plus']

df_empty = pd.DataFrame(index=date_range, columns=columns, dtype=float)

output_dir = '../../data/processed/federal/hourly/'
os.makedirs(output_dir, exist_ok=True)

# Initialize each city file
for city in tqdm(federal_cities):
    filename = f"{output_dir}{city}.csv"
    df_empty.to_csv(filename)

100%|██████████| 42/42 [00:30<00:00,  1.36it/s]


In [23]:
pollutants = {'NO2': 'no2', 'O3': 'o3', 'PM25': 'pm25'}
years = range(2005, 2024)  # 2005–2023

for pollutant, colname in pollutants.items():
    print(f"\n=== Processing {pollutant} → {colname} ===")

    for year in tqdm(years):
        file_path = f'../../data/raw/federal/{pollutant}/{pollutant}_{year}.csv'
        if not os.path.exists(file_path):
            continue

        # Read raw pollutant-year file
        df = pd.read_csv(file_path, skiprows=7)

        # Extract hourly columns
        hour_cols = [c for c in df.columns if c.startswith('H')]

        # Melt to long format
        melted = df.melt(
            id_vars=['Date//Date', 'NAPS ID//Identifiant SNPA'],
            value_vars=hour_cols,
            var_name='hour',
            value_name=colname
        )

        # Parse datetime
        melted['Date//Date'] = pd.to_datetime(melted['Date//Date'])
        melted['hour'] = melted['hour'].str.extract(r'H(\d+)').astype(int)
        melted['datetime'] = (
            melted['Date//Date'] +
            pd.to_timedelta(melted['hour'], unit='h') +
            pd.Timedelta(hours=1)
        )
        melted = melted.drop(columns=['hour', 'Date//Date'])

        # Filter invalids
        melted[colname] = pd.to_numeric(melted[colname], errors='coerce')
        melted.loc[melted[colname].isin([-999, 9999]), colname] = np.nan

        # Add month for join with station map
        melted['month'] = melted['datetime'].dt.to_period("M").dt.to_timestamp()
        melted['station_id'] = melted['NAPS ID//Identifiant SNPA'].astype(int)

        # Now we need: (station_id, month) → city
        # Reshape df_station_map for a merge
        df_long = df_station_map.stack().reset_index()
        df_long.columns = ['month', 'city', 'station_id']
        df_long['station_id'] = df_long['station_id'].dropna().astype(int)

        # Merge pollutant values with city mapping
        merged = melted.merge(df_long, on=['station_id', 'month'], how='inner')

        # Now merged has datetime, pollutant col, city
        # Group by city, then write once
        for city, sub in merged.groupby('city'):
            city_file = f"{output_dir}{city}.csv"
            df_city = pd.read_csv(city_file, index_col=0, parse_dates=True)

            df_city.loc[sub['datetime'], colname] = sub[colname].values

            df_city.to_csv(city_file)



=== Processing NO2 → no2 ===


100%|██████████| 19/19 [09:50<00:00, 31.06s/it]



=== Processing O3 → o3 ===


100%|██████████| 19/19 [11:19<00:00, 35.76s/it]



=== Processing PM25 → pm25 ===


100%|██████████| 19/19 [12:18<00:00, 38.87s/it]


Use OpenAQ to query for pollutant data over 2024 and 2025

In [30]:
client = OpenAQ(api_key=API_KEY)

sensor_records = []

for city, location_id in tqdm(FEDERAL_LOCATION_IDS.items()):
    if location_id is None:
        # placeholder for cities with no known location_id
        sensor_records.append([city, '', '', '', '', ''])
        continue

    sensors = client.locations.sensors(location_id).results

    for sensor in sensors:
        param = sensor.parameter['name']
        if param in ['pm25', 'o3', 'no2']:
            sensor_records.append([
                city,
                location_id,
                sensor.id,
                param,
                sensor.datetime_first['local'],
                sensor.datetime_last['local'],
            ])

df_sensors = pd.DataFrame.from_records(
    sensor_records,
    columns=['city', 'location_id', 'sensor_id', 'pollutant', 'date_first', 'date_last']
)
df_sensors.to_csv('../../data/raw/federal/metadata/sensors.csv', index=False)

100%|██████████| 42/42 [00:05<00:00,  7.95it/s]


In [63]:
def fetch_monthly_measurements(sensor_id, year, month):
    """Fetch all measurements for a given sensor, year, and month."""
    start_date = f"{year}-{month:02d}-01"
    last_day = monthrange(year, month)[1]
    end_date = f"{year}-{month:02d}-{last_day}T23:59:59"

    response = client.measurements.list(
        sensors_id=sensor_id,
        limit=1000,
        page=1,
        datetime_from=start_date,
        datetime_to=end_date,
    )

    results = response.results
    if not results:
        return []

    return results

# Process measurements and update CSVs
for city, location_id, sensor_id, pollutant, date_start, date_end in tqdm(sensor_records[100:]):
    if not sensor_id:
        continue

    measurements = []
    for year in [2024, 2025]:
        for month in range(1, 13):
            if year == 2025 and month >= 10:
                continue
            monthly_results = fetch_monthly_measurements(sensor_id, year, month)
            # print(f'Fetched {year}-{month} with {len(monthly_results)} entries')
            measurements.extend(monthly_results)

    # Reorganize into a DataFrame
    records = []
    for item in measurements:
        dt = parser.isoparse(item.period.datetime_to.local).replace(tzinfo=None)

        # Convert values (µg/m³ for PM2.5; ppm → ppb for O3 and NO2)
        val = item.value if pollutant == 'pm25' else item.value * 1000

        records.append((dt, val))

    if not records:
        continue

    df_new = pd.DataFrame(records, columns=['datetime', pollutant])
    df_new = df_new.set_index('datetime').sort_index()

    # Deduplicate: keep the last value per timestamp (you can also use 'mean')
    df_new = df_new.groupby(df_new.index).last()

    # Load the existing city CSV
    city_file = f"../../data/processed/federal/hourly/{city}.csv"
    df_city = pd.read_csv(city_file, index_col=0, parse_dates=True)

    # Clear old values for 2024–2025
    mask = (df_city.index >= "2024-01-01") & (df_city.index < "2026-01-01")
    df_city.loc[mask, pollutant] = np.nan

    # Align and overwrite values explicitly instead of df.update and save
    df_city.loc[df_new.index, pollutant] = df_new[pollutant]
    df_city.to_csv(city_file)
    # print(f"Updated {city} for {pollutant} (2024–2025)")

100%|██████████| 4/4 [00:40<00:00, 10.01s/it]


Compute rolling averages

In [67]:
POLLUTANTS = ['pm25', 'no2', 'o3']

for city in tqdm(FEDERAL_CITIES):
    city_path = f'../../data/processed/federal/hourly/{city}.csv'

    try:
        df = pd.read_csv(city_path, index_col=0, parse_dates=True)
    except Exception as e:
        print(f"Error reading {city_path}: {e}")
        continue

    for pollutant in POLLUTANTS:
        avg_col = f"{pollutant}_3h_avg"
        df[avg_col] = (
            df[pollutant]
            .rolling(window=3, min_periods=2)
            .mean()
            .round(1)  
        )

    # Save updated DataFrame
    df.to_csv(city_path)

100%|██████████| 42/42 [00:49<00:00,  1.17s/it]


Compute AQHI

In [68]:
def compute_mAQI(no2, o3):
    """Compute Ontario mAQI from 1-hour NO2 (ppb) and O3 (ppb)."""
    
    def sub_index_no2(val):
        if pd.isna(val):
            return np.nan
        if 0 <= val <= 110:
            return 0.02264 * val + 1.000
        elif 111 <= val <= 200:
            return 0.03360 * val - 0.2291
        elif 201 <= val <= 524:
            return 0.01235 * val + 4.017
        else:  # > 524
            return 0.01810 * val + 1.000

    def sub_index_o3(val):
        if pd.isna(val):
            return np.nan
        if 0 <= val <= 50:
            return 0.04980 * val + 1.000
        elif 51 <= val <= 80:
            return 0.1031 * val - 1.758
        elif 81 <= val <= 149:
            return 0.05868 * val + 1.747
        else:  # > 149
            return 0.05868 * val + 1.747

    no2_idx = no2.apply(sub_index_no2)
    o3_idx = o3.apply(sub_index_o3)

    return pd.concat([no2_idx, o3_idx], axis=1).max(axis=1)  # keep decimals for AQHI+ comparison

def compute_aqhi(pm25_3h, no2_3h, o3_3h):
    """Compute federal AQHI from 3-hour averages."""
    aqhi = (
        1000 * (
            (np.exp(0.000871 * no2_3h) - 1) +
            (np.exp(0.000537 * o3_3h) - 1) +
            (np.exp(0.000487 * pm25_3h) - 1)
        )
    ) / 10.4
    return aqhi

for city in tqdm(FEDERAL_CITIES):
    city_path = f'../../data/processed/federal/hourly/{city}.csv'

    try:
        df = pd.read_csv(city_path, index_col=0, parse_dates=True)
    except Exception as e:
        print(f"Error reading {city_path}: {e}")
        continue

    # --- Compute mAQI from 1-hour O3 & NO2 ---
    mAQI = compute_mAQI(df['no2'], df['o3'])

    # --- Compute federal AQHI from 3-hour averages ---
    aqhi_raw = compute_aqhi(df['pm25_3h_avg'], df['no2_3h_avg'], df['o3_3h_avg'])

    # Round AQHI for reporting (per guidelines)
    aqhi_report = aqhi_raw.round().astype('Int64')
    
    # Cap AQHI values for public display (11 == "10+")
    aqhi_report_final = aqhi_report.clip(upper=11)

    # --- Compute AQHI Plus ---
    aqhi_plus = aqhi_report.copy()
    
    # Step 1: Apply mAQI substitution
    mask_maqi = (mAQI > 6) & ((mAQI > aqhi_raw) | aqhi_raw.isna())
    aqhi_plus[mask_maqi] = mAQI[mask_maqi].round().astype('Int64')
    
    # Step 2: Apply PM2.5 trigger (April 2024 change)
    # pm25 here is the 1-hour value in µg/m³
    sub_pm25 = np.ceil(df['pm25'] / 10)
    
    mask_pm25 = sub_pm25 > aqhi_plus
    aqhi_plus[mask_pm25] = sub_pm25[mask_pm25].astype('Int64')

    # Cap AQHI Plus values for public display (11 == "10+")
    aqhi_plus_final = aqhi_plus.clip(upper=11)

    # --- Store results ---
    df['aqhi_raw'] = aqhi_report
    df['aqhi'] = aqhi_report_final
    df['aqhi_plus_raw'] = aqhi_plus
    df['aqhi_plus'] = aqhi_plus_final

    # Save updated file
    df.to_csv(city_path)

100%|██████████| 42/42 [00:53<00:00,  1.27s/it]


Compute daily AQHI values

In [4]:
daily_dates = pd.date_range(start="2005-01-01", end="2025-12-31", freq="D")

# Metrics to compute for each case
metrics = ['4pm', 'min', 'max', 'mean', 'med']

# Define cases: (col, raw_col, output_dir)
cases = {
    'aqhi': ('aqhi', 'aqhi_raw', '../../data/processed/federal/daily-aqhi'),
    'aqhi_plus': ('aqhi_plus', 'aqhi_plus_raw', '../../data/processed/federal/daily-aqhi-plus'),
    'pm25': ('pm25', 'pm25', '../../data/processed/federal/daily-pm25'),
}

for case_name, (col, raw_col, out_dir) in cases.items():
    os.makedirs(out_dir, exist_ok=True)

    print(f"Processing {case_name}")
    for city in tqdm(FEDERAL_CITIES):
        hourly_path = f'../../data/processed/federal/hourly/{city}.csv'
        daily_path = f'{out_dir}/{city}.csv'

        if not os.path.exists(hourly_path):
            print(f"No hourly file for {city}, skipping.")
            continue

        try:
            df_hourly = pd.read_csv(hourly_path, index_col=0, parse_dates=True)
        except Exception as e:
            print(f"Error reading {hourly_path}: {e}")
            continue

        if col not in df_hourly.columns:
            print(f"No {col} column for {city}, skipping.")
            continue

        # Empty daily DataFrame
        df_daily = pd.DataFrame(
            index=daily_dates,
            columns=[f"{m}_{col}" for m in metrics],
            dtype=float
        )

        # Group by day
        grouped = df_hourly.groupby(df_hourly.index.date)

        for date, group in grouped:
            date = pd.Timestamp(date)

            # 4pm value
            four_pm_val = group.loc[group.index.hour == 16, col]
            df_daily.at[date, f'4pm_{col}'] = four_pm_val.iloc[0] if not four_pm_val.empty else np.nan

            # Other metrics
            vals = group[col].dropna()
            vals_raw = group[raw_col].dropna()
            if not vals.empty:
                df_daily.at[date, f'min_{col}'] = vals.min()
                df_daily.at[date, f'max_{col}'] = vals.max()
                df_daily.at[date, f'mean_{col}'] = np.clip(vals_raw.mean().round(1), None, 11) if col != 'pm25' else vals_raw.mean().round(1)
                df_daily.at[date, f'med_{col}'] = vals.median()

        df_daily.to_csv(daily_path)

Processing aqhi


100%|██████████| 42/42 [03:24<00:00,  4.87s/it]


Processing aqhi_plus


100%|██████████| 42/42 [03:11<00:00,  4.56s/it]


Processing pm25


100%|██████████| 42/42 [04:13<00:00,  6.04s/it]


Compute the coverage of the data - percent of each pollutant in each month

In [71]:
# Required columns
REQUIRED_COLS = [
    'pm25', 'no2', 'o3',
    'pm25_3h_avg', 'no2_3h_avg', 'o3_3h_avg',
    'aqhi', 'aqhi_plus', 'aqhi_raw', 'aqhi_plus_raw'
]

def compute_coverage_for_city(city_name):
    """Compute monthly and yearly coverage for a single city"""
    # Load processed city data
    path = f'../../data/processed/federal/hourly/{city_name}.csv'
    df = pd.read_csv(path, index_col=0, parse_dates=True)

    # Ensure all required columns exist
    for col in REQUIRED_COLS:
        if col not in df.columns:
            df[col] = np.nan

    # Build full month and year index
    month_index = pd.date_range(start='2005-01-01', end='2025-12-31', freq='MS')
    coverage_monthly = pd.DataFrame(index=month_index, columns=REQUIRED_COLS)

    # Compute monthly coverage
    for month_start in month_index:
        month_end = month_start + pd.offsets.MonthEnd(1)
        month_hours = pd.date_range(start=month_start, end=month_end, freq='h')
        expected_count = len(month_hours)

        for col in REQUIRED_COLS:
            if month_start < df.index.min() or month_start > df.index.max():
                coverage_monthly.loc[month_start, col] = 0.00
            else:
                mask = (df.index >= month_start) & (df.index <= month_end)
                observed_count = df.loc[mask, col].notna().sum()
                coverage_monthly.loc[month_start, col] = (observed_count / expected_count) * 100

    # Round monthly coverage
    coverage_monthly = coverage_monthly.astype(float).round(2)

    # Compute yearly coverage by resampling and round
    coverage_yearly = coverage_monthly.resample("YS").mean().round(2)
    coverage_yearly.index = coverage_yearly.index.year

    return coverage_monthly, coverage_yearly

def process_all_cities():
    """Compute and save coverage for all cities, and aggregate into overall monthly/yearly coverage"""
    monthly_all_observed = {}
    monthly_all_expected = {}
    yearly_all_observed = {}
    yearly_all_expected = {}

    # Ensure output directories exist
    os.makedirs("../../data/processed/federal/metadata/coverage/monthly", exist_ok=True)
    os.makedirs("../../data/processed/federal/metadata/coverage/yearly", exist_ok=True)

    # Process each city
    for city in tqdm(FEDERAL_CITIES):
        monthly, yearly = compute_coverage_for_city(city)

        # Save per-city coverage
        monthly.to_csv(f'../../data/processed/federal/metadata/coverage/monthly/{city}.csv')
        yearly.to_csv(f'../../data/processed/federal/metadata/coverage/yearly/{city}.csv')

        # For overall stats, we need observed+expected counts, not percentages
        for freq, cov_df, obs_dict, exp_dict in [
            ("monthly", monthly, monthly_all_observed, monthly_all_expected),
            ("yearly", yearly, yearly_all_observed, yearly_all_expected),
        ]:
            for idx, row in cov_df.iterrows():
                if freq == "monthly":
                    start = pd.Timestamp(idx)
                    end = start + pd.offsets.MonthEnd(1)
                    expected_count = len(pd.date_range(start, end, freq='h'))
                else:  # yearly
                    start = pd.Timestamp(str(idx))
                    end = start + pd.offsets.YearEnd(0)
                    expected_count = len(pd.date_range(start, end, freq='h'))

                if idx not in obs_dict:
                    obs_dict[idx] = {col: 0 for col in REQUIRED_COLS}
                    exp_dict[idx] = {col: 0 for col in REQUIRED_COLS}

                for col in REQUIRED_COLS:
                    # observed counts (percentage * expected_count / 100)
                    observed_count = (row[col] / 100.0) * expected_count
                    obs_dict[idx][col] += observed_count
                    exp_dict[idx][col] += expected_count

    # Build overall monthly coverage
    monthly_all = pd.DataFrame(index=sorted(monthly_all_observed.keys()), columns=REQUIRED_COLS)
    for idx in tqdm(monthly_all.index, total=len(monthly_all)):
        for col in REQUIRED_COLS:
            monthly_all.loc[idx, col] = round(
                (monthly_all_observed[idx][col] / monthly_all_expected[idx][col]) * 100, 2
            )
    monthly_all.to_csv("../../data/processed/federal/metadata/coverage/monthly/all.csv")

    # Build overall yearly coverage
    yearly_all = pd.DataFrame(index=sorted(yearly_all_observed.keys()), columns=REQUIRED_COLS)
    for idx in tqdm(yearly_all.index, total=len(yearly_all)):
        for col in REQUIRED_COLS:
            yearly_all.loc[idx, col] = round(
                (yearly_all_observed[idx][col] / yearly_all_expected[idx][col]) * 100, 2
            )
    yearly_all.to_csv("../../data/processed/federal/metadata/coverage/yearly/all.csv")

    return monthly_all, yearly_all

monthly_all, yearly_all = process_all_cities()

100%|██████████| 42/42 [02:21<00:00,  3.36s/it]
100%|██████████| 252/252 [00:00<00:00, 1676.22it/s]
100%|██████████| 21/21 [00:00<00:00, 1730.36it/s]
