# US EPA Air Quality System API Data Acquisition

This notebook serves as a comprehensive guide to accessing air quality data from the US Environmental Protection Agency (EPA) Air Quality Service (AQS) API. It outlines various techniques employed to pinpoint the air quality monitoring station in proximity to Muskogee, Oklahoma, which is essential for accurate data retrieval.

Throughout the notebook, you'll find a series of methods and approaches tested to identify the most suitable station for data collection. After exploring different strategies, the notebook ultimately settles on leveraging monthly estimates for the Air Quality Index (AQI). This method ensures reliable and consistent data acquisition, allowing for informed insights into air quality trends and conditions in the Muskogee, Oklahoma area.

## License
This code snippets are developed by Dr. David W. McDonald for use in DATA 512, a course in the UW MS Data Science degree program. This code is provided under the [Creative Commons](https://creativecommons.org) [CC-BY license](https://creativecommons.org/licenses/by/4.0/). Revision 1.1 - September 5, 2023

In [1]:
# ----------------------- importing necessary libraries ---------------------- #

import pandas as pd
import json, time
import requests
import csv

In [2]:
# ---------------------------- defining constants ---------------------------- #

API_REQUEST_URL = 'https://aqs.epa.gov/data/api'

API_ACTION_SIGNUP = '/signup?email={email}'

# List actions provide information on API parameter values that are required by some other actions/requests
API_ACTION_LIST_CLASSES = '/list/classes?email={email}&key={key}'
API_ACTION_LIST_PARAMS = '/list/parametersByClass?email={email}&key={key}&pc={pclass}'
API_ACTION_LIST_SITES = '/list/sitesByCounty?email={email}&key={key}&state={state}&county={county}'

# Monitor actions are requests for monitoring stations that meet specific criteria
API_ACTION_MONITORS_COUNTY = '/monitors/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_MONITORS_BOX = '/monitors/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'

# Summary actions are requests for summary data. These are for daily summaries
API_ACTION_DAILY_SUMMARY_COUNTY = '/dailyData/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_DAILY_SUMMARY_BOX = '/dailyData/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'

# It is always nice to be respectful of a free data resource.
# We're going to observe a 100 requests per minute limit - which is fairly nice
API_LATENCY_ASSUMED = 0.002       # Assuming roughly 2ms latency on the API and network
API_THROTTLE_WAIT = (1.0/100.0)-API_LATENCY_ASSUMED

# This is a template that covers most of the parameters for the actions we might take, from the set of actions
# above. In the examples below, most of the time parameters can either be supplied as individual values to a
# function - or they can be set in a copy of the template and passed in with the template.

AQS_REQUEST_TEMPLATE = {
    "email":      "",
    "key":        "",
    "state":      "",     # the two digit state FIPS # as a string
    "county":     "",     # the three digit county FIPS # as a string
    "begin_date": "",     # the start of a time window in YYYYMMDD format
    "end_date":   "",     # the end of a time window in YYYYMMDD format, begin_date and end_date must be in the same year
    "minlat":    0.0,
    "maxlat":    0.0,
    "minlon":    0.0,
    "maxlon":    0.0,
    "param":     "",     # a list of comma separated 5 digit codes, max 5 codes requested
    "pclass":    ""      # parameter class is only used by the List calls
}

The code below is used to create API Keys to access the data. Once we have the API keys, we comment out the code to avoid running it and generating a new key again. This practice helps maintain the security and integrity of the existing API keys and ensures that they are not accidentally overwritten or exposed.

In [3]:
# #
# #    This implements the sign-up request. The parameters are standardized so that this function definition matches
# #    all of the others. However, the easiest way to call this is to simply call this function with your preferred
# #    email address.
# #
def request_signup(email_address = None,
                   endpoint_url = API_REQUEST_URL,
                   endpoint_action = API_ACTION_SIGNUP,
                   request_template = AQS_REQUEST_TEMPLATE,
                   headers = None):

    # Make sure we have a string - if you don't have access to this email addres, things might go badly for you
    if email_address:
        request_template['email'] = email_address
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_signup()'")

    # Compose the signup url - create a request URL by combining the endpoint_url with the parameters for the request
    request_url = endpoint_url+endpoint_action.format(**request_template)

    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response

# #
# #    A SIGNUP request is only to be done once, to request a key. A key is sent to that email address and needs to be confirmed with a click through
# #    This code should probably be commented out after you've made your key request to make sure you don't accidentally make a new sign-up request
# #
# print("Requesting SIGNUP ...")
# response = request_signup("nsaumya@uw.edu")
# print(json.dumps(response,indent=4))
# #

To ensure accurate air quality monitoring, it's crucial to understand the various types of air quality sensors and the diverse locations where air quality stations are deployed. Different sensor technologies, such as optical, chemical, and particulate sensors, provide insights into various air pollutants, including PM2.5, PM10, VOCs, CO, NO2, and more. Air quality stations are strategically placed in urban areas, industrial zones, traffic intersections, and even residential neighborhoods to capture a wide range of air quality data.

In [4]:
USERNAME = 'nsaumya@uw.edu'
APIKEY = 'bolegazelle84'

In [5]:
def request_list_info(email_address=None, key=None,
                      endpoint_url=API_REQUEST_URL,
                      endpoint_action=API_ACTION_LIST_CLASSES,
                      request_template=AQS_REQUEST_TEMPLATE,
                      headers=None):
    """
    Requests a list of information using the EPA Air Quality Service (AQS) API.

    Parameters:
    email_address (str): The email address associated with the API access.
    key (str): The API key for authentication.
    endpoint_url (str): The base URL for API requests.
    endpoint_action (str): The specific API action to perform.
    request_template (dict): The request template with parameters to be filled.
    headers (dict): Optional headers for the HTTP request.

    Returns:
    dict: A dictionary containing the API response in JSON format.

    Raises:
    Exception: If email_address or key is missing in the request_template.

    """

    # make sure we have email and key - at least
    # this prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key

    # for the basic request, we need an email address and a key
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_list_info()'")
    if not request_template['key']:
        raise Exception("Must supply a key to call 'request_list_info()'")

    # compose the request URL
    request_url = endpoint_url + endpoint_action.format(**request_template)

    try:
        # wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None

    return json_response


In [6]:
# create a copy of the AQS_REQUEST_TEMPLATE and populate it with your email and API key
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY

# send a request to retrieve a list of information based on the populated request_data
response = request_list_info(request_template=request_data)

# check if the response status is "Success"
if response["Header"][0]['status'] == "Success":
    # print the data in a nicely formatted JSON format
    print(json.dumps(response['Data'], indent=4))
else:
    # print the entire response if it's not a success
    print(json.dumps(response, indent=4))


[
    {
        "code": "AIRNOW MAPS",
        "value_represented": "The parameters represented on AirNow maps (88101, 88502, and 44201)"
    },
    {
        "code": "ALL",
        "value_represented": "Select all Parameters Available"
    },
    {
        "code": "AQI POLLUTANTS",
        "value_represented": "Pollutants that have an AQI Defined"
    },
    {
        "code": "CORE_HAPS",
        "value_represented": "Urban Air Toxic Pollutants"
    },
    {
        "code": "CRITERIA",
        "value_represented": "Criteria Pollutants"
    },
    {
        "code": "CSN DART",
        "value_represented": "List of CSN speciation parameters to populate the STI DART tool"
    },
    {
        "code": "FORECAST",
        "value_represented": "Parameters routinely extracted by AirNow (STI)"
    },
    {
        "code": "HAPS",
        "value_represented": "Hazardous Air Pollutants"
    },
    {
        "code": "IMPROVE CARBON",
        "value_represented": "IMPROVE Carbon Parameters"
    }

In the quest for accurate air quality monitoring, the focus on specific sensors called AQI Pollutants is vital. These sensors are designed to measure key air quality indicators, including PM2.5, PM10, VOCs, CO, NO2, and other pollutants. The resulting response typically includes a comprehensive list of sensor ID numbers, sensor names, and detailed descriptions.

In [7]:
AQI_PARAM_CLASS = "AQI POLLUTANTS"

In [8]:
#
#   Structure a request to get the sensor IDs associated with the AQI
#
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['pclass'] = AQI_PARAM_CLASS  # here we specify that we want this 'pclass' or parameter classs

response = request_list_info(request_template=request_data, endpoint_action=API_ACTION_LIST_PARAMS)

if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
else:
    print(json.dumps(response,indent=4))

[
    {
        "code": "42101",
        "value_represented": "Carbon monoxide"
    },
    {
        "code": "42401",
        "value_represented": "Sulfur dioxide"
    },
    {
        "code": "42602",
        "value_represented": "Nitrogen dioxide (NO2)"
    },
    {
        "code": "44201",
        "value_represented": "Ozone"
    },
    {
        "code": "81102",
        "value_represented": "PM10 Total 0-10um STP"
    },
    {
        "code": "88101",
        "value_represented": "PM2.5 - Local Conditions"
    },
    {
        "code": "88502",
        "value_represented": "Acceptable PM2.5 AQI & Speciation Mass"
    }
]


In [9]:
# gaseous AQI pollutants CO, SO2, NO2, and O2
AQI_PARAMS_GASEOUS = "42101,42401,42602,44201"

# particulate AQI pollutants PM10, PM2.5, and Acceptable PM2.5
AQI_PARAMS_PARTICULATES = "81102,88101,88502"

In [10]:
# ----------------------------- defining the city ---------------------------- #

CITY_LOCATIONS = {
    'muskogee' :       {'city'   : 'Muskogee',
                       'county' : 'Muskogee',
                       'state'  : 'Oklahoma',
                       'fips'   : '40101',
                       'latlon' : [35.7479, -95.3697] },
}

In [11]:
# ------------------------------ nearby stations ----------------------------- #

#
#  This list request should give us a list of all the monitoring stations in the county specified by the
#  given city selected from the CITY_LOCATIONS dictionary
#
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['state'] = CITY_LOCATIONS['muskogee']['fips'][:2]   # the first two digits (characters) of FIPS is the state code
request_data['county'] = CITY_LOCATIONS['muskogee']['fips'][2:]  # the last three digits (characters) of FIPS is the county code

response = request_list_info(request_template=request_data, endpoint_action=API_ACTION_LIST_SITES)

if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
else:
    print(json.dumps(response,indent=4))


[
    {
        "code": "0160",
        "value_represented": "5 MILES SOUTH OF HASKELL AT OSU RESEARCH STATION"
    },
    {
        "code": "0161",
        "value_represented": null
    },
    {
        "code": "0162",
        "value_represented": null
    },
    {
        "code": "0163",
        "value_represented": null
    },
    {
        "code": "0164",
        "value_represented": null
    },
    {
        "code": "0166",
        "value_represented": null
    },
    {
        "code": "0167",
        "value_represented": "MUSKOGEE WATER TREATMENT PLANT"
    },
    {
        "code": "0168",
        "value_represented": null
    },
    {
        "code": "0169",
        "value_represented": "DOWNTOWN MUSKOGEE"
    },
    {
        "code": "0170",
        "value_represented": null
    },
    {
        "code": "9019",
        "value_represented": null
    }
]


In [12]:
# --------------------------- restructing response --------------------------- #

def request_daily_summary(email_address = None, key = None, param=None,
                          begin_date = None, end_date = None, fips = None,
                          endpoint_url = API_REQUEST_URL,
                          endpoint_action = API_ACTION_DAILY_SUMMARY_COUNTY,
                          request_template = AQS_REQUEST_TEMPLATE,
                          headers = None):

    #  this prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]

    # make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_daily_summary()'")
    if not request_template['key']:
        raise Exception("Must supply a key to call 'request_daily_summary()'")
    if not request_template['param']:
        raise Exception("Must supply param values to call 'request_daily_summary()'")
    if not request_template['begin_date']:
        raise Exception("Must supply a begin_date to call 'request_daily_summary()'")
    if not request_template['end_date']:
        raise Exception("Must supply an end_date to call 'request_daily_summary()'")
    # note we're not validating FIPS fields because not all of the daily summary actions require the FIPS numbers

    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)

    # make the request
    try:
        # wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response

In [14]:
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['param'] = AQI_PARAMS_GASEOUS
request_data['state'] = CITY_LOCATIONS['muskogee']['fips'][:2]
request_data['county'] = CITY_LOCATIONS['muskogee']['fips'][2:]

In [15]:
EXTRACTION_FIELDS = ['sample_duration','observation_count','arithmetic_mean','aqi']

#    The function creates a summary record
def extract_summary_from_response(r=None, fields=EXTRACTION_FIELDS):
    # the result will be structured around monitoring site, parameter, and then date
    result = dict()
    data = r["Data"]
    for record in data:

        # make sure the record is set up
        site = record['site_number']
        param = record['parameter_code']

        #date = record['date_local']    # this version keeps the respnse value YYYY-
        date = record['date_local'].replace('-','') # this puts it in YYYYMMDD format
        if site not in result:
            result[site] = dict()
            result[site]['local_site_name'] = record['local_site_name']
            result[site]['site_address'] = record['site_address']
            result[site]['state'] = record['state']
            result[site]['county'] = record['county']
            result[site]['city'] = record['city']
            result[site]['pollutant_type'] = dict()
        if param not in result[site]['pollutant_type']:
            result[site]['pollutant_type'][param] = dict()
            result[site]['pollutant_type'][param]['parameter_name'] = record['parameter']
            result[site]['pollutant_type'][param]['units_of_measure'] = record['units_of_measure']
            result[site]['pollutant_type'][param]['method'] = record['method']
            result[site]['pollutant_type'][param]['data'] = dict()
        if date not in result[site]['pollutant_type'][param]['data']:
            result[site]['pollutant_type'][param]['data'][date] = list()

        # now extract the specified fields
        extract = dict()
        for k in fields:
            if str(k) in record:
                extract[str(k)] = record[k]
            else:
                # this makes sure we always have the requested fields, even if
                # we have a missing value for a given day/month
                extract[str(k)] = None

        # add this extraction to the list for the day
        result[site]['pollutant_type'][param]['data'][date].append(extract)

    return result

In [16]:
from tqdm import tqdm

# Request daily summary data for the 1963-2023
average_aqi_per_year = {}
for year in tqdm(range(1963, 2023)):

    year_aqi_count = 0
    year_aqi_sum = 0
    count = 0
    begin_date = f"{year}0101"
    end_date = f"{year}1231"
    request_data['param'] = AQI_PARAMS_PARTICULATES
    # request daily summary data for the month of July in 2021
    particulate_aqi = request_daily_summary(request_template=request_data, begin_date=begin_date, end_date=end_date)
    try:
        if particulate_aqi["Header"][0]['status'].startswith("No data "):
            print(f"No data for year {begin_date}-{end_date}.")
        extract_particulate = extract_summary_from_response(particulate_aqi)
        first_site_location = next(iter(extract_particulate.values()))
        data_for_first_site = first_site_location.get('pollutant_type', {})
        for pollutant_data in data_for_first_site.values():
            year_aqi_data = pollutant_data.get('data', {})
            # Loop through the data for each date in the year
            for date, aqi_list in year_aqi_data.items():
                for entry in aqi_list:
                    if entry['aqi']:
                        year_aqi_sum += entry['aqi']
                        year_aqi_count += 1
    except Exception as e:
        pass
    # Calculate the average AQI for the year
    if year_aqi_count > 0:
        average_aqi = year_aqi_sum / year_aqi_count
        average_aqi_per_year[year] = average_aqi

  2%|▏         | 1/60 [01:15<1:14:10, 75.44s/it]

No data for year 19630101-19631231.


  3%|▎         | 2/60 [02:30<1:12:55, 75.44s/it]

No data for year 19640101-19641231.


  5%|▌         | 3/60 [03:46<1:11:39, 75.43s/it]

No data for year 19650101-19651231.


  7%|▋         | 4/60 [05:01<1:10:24, 75.43s/it]

No data for year 19660101-19661231.


  8%|▊         | 5/60 [06:17<1:09:08, 75.43s/it]

No data for year 19670101-19671231.


 10%|█         | 6/60 [07:32<1:07:53, 75.44s/it]

No data for year 19680101-19681231.


 12%|█▏        | 7/60 [08:48<1:06:38, 75.45s/it]

No data for year 19690101-19691231.


 13%|█▎        | 8/60 [10:03<1:05:23, 75.44s/it]

No data for year 19700101-19701231.


 15%|█▌        | 9/60 [11:18<1:04:07, 75.44s/it]

No data for year 19710101-19711231.


 17%|█▋        | 10/60 [12:34<1:02:51, 75.44s/it]

No data for year 19720101-19721231.


 18%|█▊        | 11/60 [13:49<1:01:36, 75.44s/it]

No data for year 19730101-19731231.


 20%|██        | 12/60 [15:05<1:00:20, 75.43s/it]

No data for year 19740101-19741231.


 22%|██▏       | 13/60 [16:20<59:05, 75.43s/it]  

No data for year 19750101-19751231.


 23%|██▎       | 14/60 [17:36<57:50, 75.44s/it]

No data for year 19760101-19761231.


 25%|██▌       | 15/60 [18:51<56:34, 75.43s/it]

No data for year 19770101-19771231.


 27%|██▋       | 16/60 [20:07<55:22, 75.51s/it]

No data for year 19780101-19781231.


 28%|██▊       | 17/60 [21:22<54:06, 75.50s/it]

No data for year 19790101-19791231.


 30%|███       | 18/60 [22:38<52:50, 75.48s/it]

No data for year 19800101-19801231.


 32%|███▏      | 19/60 [23:53<51:34, 75.46s/it]

No data for year 19810101-19811231.


 33%|███▎      | 20/60 [25:09<50:18, 75.46s/it]

No data for year 19820101-19821231.


 35%|███▌      | 21/60 [26:24<49:02, 75.45s/it]

No data for year 19830101-19831231.


 37%|███▋      | 22/60 [27:39<47:46, 75.45s/it]

No data for year 19840101-19841231.


 38%|███▊      | 23/60 [28:55<46:31, 75.44s/it]

No data for year 19850101-19851231.


 40%|████      | 24/60 [30:10<45:15, 75.44s/it]

No data for year 19860101-19861231.


 42%|████▏     | 25/60 [31:26<44:00, 75.44s/it]

No data for year 19870101-19871231.


 43%|████▎     | 26/60 [32:41<42:47, 75.51s/it]

No data for year 19880101-19881231.


 97%|█████████▋| 58/60 [1:13:07<02:31, 75.70s/it]

No data for year 20200101-20201231.


 98%|█████████▊| 59/60 [1:14:22<01:15, 75.62s/it]

No data for year 20210101-20211231.


100%|██████████| 60/60 [1:15:38<00:00, 75.64s/it]

No data for year 20220101-20221231.





In [19]:
df = pd.DataFrame(list(average_aqi_per_year.items()), columns=['Year', 'Avg_AQI'])
df.head()

Unnamed: 0,Year,Avg_AQI
0,1989,27.233333
1,1990,22.301887
2,1991,25.118644
3,1992,24.407407
4,1993,28.844828


In [20]:
df.to_csv("data/avg-aqi-yearly.csv")