# Collect AQI Data

The following code is taken, mostly as-is from the work of Dr. McDonald for DATA 512 course at the University of Washington

In [None]:
import json, time
import requests
import pandas as pd
import logging
from tqdm import tqdm

In [None]:
# Create and configure logger
logging.basicConfig(filename=f"../pipeline.log",
                    format='%(asctime)s %(message)s',
                    filemode='a',
                    level=logging.INFO)

logger = logging.getLogger()

In [None]:
API_REQUEST_URL = 'https://aqs.epa.gov/data/api'
API_ACTION_SIGNUP = '/signup?email=vaibhav1@uw.edu'
API_ACTION_LIST_CLASSES = '/list/classes?email={email}&key={key}'
API_ACTION_LIST_PARAMS = '/list/parametersByClass?email={email}&key={key}&pc={pclass}'
API_ACTION_LIST_SITES = '/list/sitesByCounty?email={email}&key={key}&state={state}&county={county}'
API_ACTION_MONITORS_COUNTY = '/monitors/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_MONITORS_BOX = '/monitors/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
API_ACTION_DAILY_SUMMARY_COUNTY = '/dailyData/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_DAILY_SUMMARY_BOX = '/dailyData/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
API_LATENCY_ASSUMED = 0.002       # Assuming roughly 2ms latency on the API and network
API_THROTTLE_WAIT = (1.0/100.0)-API_LATENCY_ASSUMED
AQS_REQUEST_TEMPLATE = {
    "email":      "",     
    "key":        "",      
    "state":      "",     # the two digit state FIPS # as a string
    "county":     "",     # the three digit county FIPS # as a string
    "begin_date": "",     # the start of a time window in YYYYMMDD format
    "end_date":   "",     # the end of a time window in YYYYMMDD format, begin_date and end_date must be in the same year
    "minlat":    0.0,
    "maxlat":    0.0,
    "minlon":    0.0,
    "maxlon":    0.0,
    "param":     "",     # a list of comma separated 5 digit codes, max 5 codes requested
    "pclass":    ""      # parameter class is only used by the List calls
}

In [None]:
USERNAME = "vaibhav1@uw.edu"
APIKEY = 'bluehare23'

In [None]:
AQI_PARAM_CLASS = "AQI POLLUTANTS"

In [None]:
#   Gaseous AQI pollutants CO, SO2, NO2, and O2
AQI_PARAMS_GASEOUS = "42101,42401,42602,44201"
#   Particulate AQI pollutants PM10, PM2.5, and Acceptable PM2.5
AQI_PARAMS_PARTICULATES = "81102,88101,88502"

Air quality monitoring stations are located all over the US at different locations. We will need some sample locations to experiment with different locations to see what kinds of values come back from different sensor requests.

This list includes the [FIPS](https://www.census.gov/library/reference/code-lists/ansi.html) number for the state and county as a 5 digit string. This format, the 5 digit string, is a 'old' format that is still widely used. There are new codes that may eventually be adopted for the US government information systems. But FIPS is currently what the AQS uses, so that's what is in the list as the constant.

Just two example cities to explore.

In [None]:
#
#   We'll use these two city locations in the examples below.
#
CITY_LOCATIONS = {
    'prescott' :       {'city'   : 'Prescott Valley',
                       'county' : 'Yavapai',
                       'state'  : 'Arizona',
                       'fips'   : '04025',
                       'latlon' : [34.561389, -112.54] }
}

In [None]:
def request_daily_summary(email_address = None, key = None, param=None,
                          begin_date = None, end_date = None, fips = None,
                          endpoint_url = API_REQUEST_URL, 
                          endpoint_action = API_ACTION_DAILY_SUMMARY_COUNTY, 
                          request_template = AQS_REQUEST_TEMPLATE,
                          headers = None):
    
    #  This prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]            
        
    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
        
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response

In [None]:
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['param'] = AQI_PARAMS_GASEOUS
request_data['state'] = CITY_LOCATIONS['prescott']['fips'][:2]
request_data['county'] = CITY_LOCATIONS['prescott']['fips'][2:]

df_gaseous_aqi = []
counter = 0
# Loop through years in reverse order (from 2023 to 1963), we expect to have recent data
for year in tqdm(reversed(range(1963, 2024))):
    logger.info(f"Collecting Gaseous Data for year: {year}")
    # Check if there are 10 consecutive years of no gaseous data
    if counter == 10:
        logger.info('Got 10 consecutive years of no gaseous data, stopping requests')
        break
        
    # Request daily summary data
    gaseous_aqi = request_daily_summary(request_template=request_data, begin_date=f"{year}0101", end_date=f"{year}1231")
    # Pause for 5 seconds to avoid excessive API requests
    time.sleep(5)
    
    # Check if the response contains no data
    if gaseous_aqi.get('Data') == []:
        logger.info(f"Got no data for {year}, status = {gaseous_aqi['Header']}")
        counter += 1
        continue
    
    # Append the data to the list and reset counter since data was obtained
    df_gaseous_aqi.append(pd.DataFrame(gaseous_aqi['Data']))
    counter = 0

    # Save data every 10 years incase the API breaks down or gives a new error
    if year % 10 == 0:
        pd.concat(df_gaseous_aqi).to_csv('gaseous_api_data_save.csv')

# Concatenate all dataframes in the list and save to a CSV file
df_gaseous_aqi = pd.concat(df_gaseous_aqi)
df_gaseous_aqi.to_csv('gaseous_aqi.csv')

In [None]:
request_data['param'] = AQI_PARAMS_PARTICULATES

list_particle_aqi = []
counter = 0
# Loop through years in reverse order (from 2023 to 1963)
for year in tqdm(reversed(range(1963, 2024))):
    logger.info(f"Collecting Particle Data for year: {year}")    
    
    # Check if there are 10 consecutive years of no particle data
    if counter == 10:
        logger.info('Got 10 consecutive years of no particle data, stopping requests')
        break
    
    # Request daily summary data
    particulate_aqi = request_daily_summary(request_template=request_data, begin_date=f"{year}0101", end_date=f"{year}1231", endpoint_action=API_ACTION_MONITORS_COUNTY)
    # Pause for 5 seconds to avoid excessive API requests
    time.sleep(5)
    
    # Check if the response contains no data
    if particulate_aqi.get('Data') == []:
        logger.info(f"Got no data for {year}, status = {particulate_aqi['Header']}")
        counter += 1
        continue
    
    # Append the data to the list and reset counter since data was obtained
    list_particle_aqi.append(pd.DataFrame(particulate_aqi['Data']))
    counter = 0
    
    # Save data every 10 years
    if year % 10 == 0:
        pd.concat(df_particle_aqi).to_csv('particle_api_data_save.csv')

# Concatenate all dataframes in the list and save to a CSV file
if list_particle_aqi != []:
    df_particle_aqi = pd.concat(list_particle_aqi)
df_particle_aqi.to_csv('particle_aqi.csv')