# Utah Air Quality Data Scraper

This notebook scrapes historical air quality data for Utah from the EPA AQS API.

**Parameters:**
- Ozone (O3)
- Nitric Oxide (NO)
- Nitrogen Dioxide (NO2)
- Solar Radiation (SR)
- Temperature (Temp)

**Output:**
- Raw Data: `data/{site_name}_raw.csv`
- Cleaned Data: `data/{site_name}_cleaned.csv` (Rows with missing values dropped)
- Metadata: `data/{site_name}_metadata.json`

**Note:** This script pulls **Hourly Data**, which may take a significant amount of time.

In [None]:
import requests
import pandas as pd
import json
import time
import os
import re
from datetime import datetime

# Ensure data directory exists
os.makedirs('data', exist_ok=True)

# Constants
BASE_URL = "https://aqs.epa.gov/data/api"
STATE_CODE = "49"  # Utah

# Parameter Codes
PARAMS = {
    "44201": "O3",
    "42601": "NO",
    "42602": "NO2",
    "63301": "SR",
    "62101": "Temp"
}
PARAM_CODES_STR = ",".join(PARAMS.keys())

# Helper for rate limiting
def wait_for_api():
    time.sleep(1) # Be polite to the API

## 1. Authentication
Enter your EPA AQS API credentials below. If you don't have them, run the signup cell.

In [None]:
# CREDENTIALS
API_EMAIL = "test@test.com" # REPLACE WITH YOUR EMAIL
API_KEY = "test"   # REPLACE WITH YOUR KEY

def signup(email):
    url = f"{BASE_URL}/signup?email={email}"
    response = requests.get(url)
    print(response.json())

## 2. Site Discovery
Get all sites in Utah.

In [None]:
def get_sites(email, key, state_code):
    # 1. Get Counties
    print("Fetching counties...")
    counties_url = f"{BASE_URL}/list/countiesByState?email={email}&key={key}&state={state_code}"
    resp = requests.get(counties_url)
    if resp.status_code != 200:
        print("Error fetching counties:", resp.text)
        return []
    
    counties = resp.json().get('Data', [])
    all_sites = []
    
    # 2. Get Sites for each County
    print(f"Found {len(counties)} counties. Fetching sites...")
    for county in counties:
        county_code = county['code']
        # sitesByCounty
        sites_url = f"{BASE_URL}/list/sitesByCounty?email={email}&key={key}&state={state_code}&county={county_code}"
        s_resp = requests.get(sites_url)
        wait_for_api()
        
        if s_resp.status_code == 200:
            sites_data = s_resp.json().get('Data', [])
            for site in sites_data:
                # Enrich with county info for easier reference
                site['county_code'] = county_code
                site['county_name'] = county['value_represented']
                all_sites.append(site)
        else:
            print(f"Warning: Could not fetch sites for county {county_code}")

    # Deduplicate sites just in case (though list/sitesByCounty shouldn't overlap)
    # A site is uniquely identified by state-county-site_number
    unique_sites = {}
    for s in all_sites:
        uid = f"{state_code}-{s['county_code']}-{s['code']}"
        unique_sites[uid] = s
    
    return list(unique_sites.values())

# Check Monitors to pre-filter sites (Only keep sites that have ALL 5 params)
def check_site_monitors(email, key, state, county, site):
    # list/monitorsBySite
    url = f"{BASE_URL}/list/monitorsBySite?email={email}&key={key}&state={state}&county={county}&site={site}"
    resp = requests.get(url)
    wait_for_api()
    if resp.status_code != 200:
        return False, []
    
    monitors = resp.json().get('Data', [])
    found_params = set()
    for m in monitors:
        if m['parameter_code'] in PARAMS:
            found_params.add(m['parameter_code'])
            
    # Function returns True if ALL 5 are present
    # PARAMS.keys() are strings in the set
    required = set(PARAMS.keys())
    
    # STRICT CHECK: Must have ALL 5
    has_all = required.issubset(found_params)
    return has_all, list(found_params)

## 3. Data Fetching
Fetch hourly data for valid sites.

In [None]:
def fetch_hourly_data(email, key, state, county, site, bdate, edate):
    # Fetch data for all params at once
    # URL: sampleData/bySite
    url = f"{BASE_URL}/sampleData/bySite?email={email}&key={key}&param={PARAM_CODES_STR}&bdate={bdate}&edate={edate}&state={state}&county={county}&site={site}"
    resp = requests.get(url)
    wait_for_api()
    if resp.status_code != 200:
        print(f"    Error fetching {bdate}-{edate}: {resp.status_code}")
        return []
    
    return resp.json().get('Data', [])

def sanitize_filename(name):
    return re.sub(r'[^\w\-_]', '_', name)

## 4. Main Execution Loop
This loop will:
1. Get all sites.
2. Filter for sites that have ALL 5 parameters.
3. Fetch hourly data year by year.
4. Clean and Save.

In [None]:
# Date Range to search
START_YEAR = 1980
END_YEAR = datetime.now().year

# 1. Get List of Sites
all_sites_list = get_sites(API_EMAIL, API_KEY, STATE_CODE)
print(f"Total Sites in Utah: {len(all_sites_list)}")

for site_info in all_sites_list:
    site_id = site_info['code']
    county_code = site_info['county_code']
    site_name = site_info.get('value_represented', f"Site_{site_id}") # Use value_represented as name if available
    # Fallback if value_represented is empty or just a number, try local site name
    if not site_name or site_name.strip() == "":
         site_name = site_info.get('local_site_name', f"Site_{site_id}")

    safe_name = sanitize_filename(site_name)
    print(f"Processing Site: {site_name} (ID: {site_id}) in County {county_code}...")

    # 2. Check Monitors (Pre-filter)
    has_all_params, found_params = check_site_monitors(API_EMAIL, API_KEY, STATE_CODE, county_code, site_id)
    if not has_all_params:
        print(f"  Skipping. Missing parameters. Found: {[PARAMS.get(p, p) for p in found_params]}")
        continue
    
    print(f"  Site has all parameters! Fetching data...")
    
    all_records = []
    
    # 3. Fetch Data Year by Year
    for year in range(START_YEAR, END_YEAR + 1):
        bdate = f"{year}0101"
        edate = f"{year}1231"
        
        # Skip future dates
        if year == datetime.now().year:
             today_str = datetime.now().strftime("%Y%m%d")
             if bdate > today_str: break
             edate = min(edate, today_str)

        print(f"    Fetching {year}...")
        year_data = fetch_hourly_data(API_EMAIL, API_KEY, STATE_CODE, county_code, site_id, bdate, edate)
        all_records.extend(year_data)
    
    if not all_records:
        print("  No data records found.")
        continue

    # 4. Processing
    print(f"  Processing {len(all_records)} records...")
    df = pd.DataFrame(all_records)

    # Relevant columns: 'date_gmt', 'time_gmt', 'parameter_code', 'sample_measurement', 'units_of_measure'
    # We need to construct a usage Timestamp
    # Combine Date/Time
    # Note: Using GMT or Local? API returns both. Let's use Local for standard analysis if preferred, or GMT.
    # User didn't specify, but usually Local Standard Time (date_local, time_local) is best for air quality trends.
    df['datetime'] = pd.to_datetime(df['date_local'] + ' ' + df['time_local'])
    
    # Map Parameter Codes to Names
    df['param_name'] = df['parameter_code'].map(PARAMS)
    
    # Pivot
    # Index: datetime
    # Columns: param_name
    # Values: sample_measurement
    # Aggregation: mean (in case of duplicates, though sampleData shouldn't have them for same POC. 
    # If multiple POCs exist for same param, we average them? Or take the first? 
    # Averaging across POCs is safer for 'site value')
    df_pivoted = df.pivot_table(index='datetime', columns='param_name', values='sample_measurement', aggfunc='mean')

    # 5. Saving Raw
    raw_file = f"data/{safe_name}_raw.csv"
    df_pivoted.to_csv(raw_file)
    print(f"  Saved RAW: {raw_file}")

    # 6. Cleaning
    # Strict Drop: Row must have O3, NO, NO2, SR, Temp
    required_cols = ['O3', 'NO', 'NO2', 'SR', 'Temp']
    # Improve robustness: Only drop if columns exist. But we already checked `check_site_monitors` so they should be there.
    # However, if data fetch returned nothing for a specific param for that year, the column might be missing in pivot.
    # Reindex to ensure all columns exist
    df_pivoted = df_pivoted.reindex(columns=required_cols)
    
    df_cleaned = df_pivoted.dropna()
    
    cleaned_file = f"data/{safe_name}_cleaned.csv"
    df_cleaned.to_csv(cleaned_file)
    print(f"  Saved CLEANED: {cleaned_file} ({len(df_cleaned)} rows)")

    # 7. Metadata
    # Extract units
    units = {}
    for p_code, p_name in PARAMS.items():
         # Find first record with this code to get unit
         subset = df[df['parameter_code'] == p_code]
         if not subset.empty:
             units[p_name] = subset.iloc[0]['units_of_measure']
         else:
             units[p_name] = "N/A"

    metadata = {
        "site_name": site_name,
        "site_id": site_id,
        "county_code": county_code,
        "address": site_info.get('address'),
        "latitude": site_info.get('latitude'),
        "longitude": site_info.get('longitude'),
        "parameters": required_cols,
        "units": units,
        "raw_file": raw_file,
        "cleaned_file": cleaned_file,
        "notice": "Cleaned file contains only rows where ALL 5 parameters were present."
    }
    
    meta_file = f"data/{safe_name}_metadata.json"
    with open(meta_file, 'w') as f:
        json.dump(metadata, f, indent=4)
    print(f"  Saved Metadata: {meta_file}")
