# Part 1 - Common Analysis | AQI Data Pull

# Air Quality Index

In the previous notebook we have obtained the wildfire related data for the city of Bismarck, North Dakota. After analysing the data and understanding the data, there was a mechanism devised to estimate an index to measure the smoke for the city assigned. By referring to several resources, an arbitarary formula was devised to calculate the index. However, one very important aspect to check the validity of the estimate is to compare it to the ground reality. Hence, we are pulling Air quality index data from the API to check against our calculated estimates

## Brief Background

The US EPA was only created in 1973, and did not really begin installing air quality monitoring stations until the early 1980s. Further, of 3000+ counties in the US, the EPA has vetted monitoring stations in only 2000 of them. This means that US EPA AQI measures for any one city will need to be some kind of estimate based on monitoring stations that are nearby. 

This example illustrates how to request data from the US Environmental Protection Agency (EPA) Air Quality Service (AQS) API. This is a historical API and does not provide real-time air quality data. The [documentation](https://aqs.epa.gov/aqsweb/documents/data_api.html) for the API provides definitions of the different call parameter and examples of the various calls that can be made to the API.

This notebook works systematically through example calls, requesting an API key, using 'list' to get various IDs and parameter values, and using 'daily summary' to get summary data that meets specific condistions.

The API helps resolve this by providing calls to search for monitoring stations and data using either station ids, or a county designation or a geographic bounding box. This example code provides examples of the county based and bounding box based API calls. Some [additional information on the Air Quality System can be found in the EPA FAQ](https://www.epa.gov/outdoor-air-quality-data/frequent-questions-about-airdata) on the system.

The end goal of this example is to get to some values that we might use for the Air Quality Index or AQI. You might see this reported on the news, most often around smog, but more frequently with regard to smoke. The AQI index is meant to tell us something about how healthy or clean the air is on any day. The AQI is actually a somewhat complext measure. When I started this example I looked up [how to calculate the AQI](https://www.airnow.gov/sites/default/files/2020-05/aqi-technical-assistance-document-sept2018.pdf) so that I would know roughly what goes into that value.


## License
This code base was developed on the basis of the examples which were developed by Dr. David W. McDonald for use in DATA 512, a course in the UW MS Data Science degree program. This code is provided under the [Creative Commons](https://creativecommons.org) [CC-BY license](https://creativecommons.org/licenses/by/4.0/). Revision 1.1 - September 5, 2023


## Setup

We first set the working dependencies and constants that are required to communicate with the API. Sample code has been obtained from [this notebook](https://drive.google.com/file/d/1sGKvcFdd492IJYL1No0IQfVSI8OF1vgo/view?usp=drive_link)

The setup contains the following steps
1. Import all relevant packages
2. Define all the relevant constants that will be used throughout the script.


In [1]:
# import all the packages required

# These are standard python modules
import json, time
# The 'requests' module is a distribution module for making web requests.
import requests

# import supporting packages
import pandas as pd
from datetime import datetime, timedelta

In [2]:
# defining all the constants required to make an API request

# This is the root of all AQS API URLs
API_REQUEST_URL = 'https://aqs.epa.gov/data/api'

#These are 'actions' we can ask the API to take or requests that we can make of the API

# Sign-up request - generally only performed once - unless you lose your key
API_ACTION_SIGNUP = '/signup?email={email}'

# List actions provide information on API parameter values that are required by some other actions/requests
API_ACTION_LIST_CLASSES = '/list/classes?email={email}&key={key}'
API_ACTION_LIST_PARAMS = '/list/parametersByClass?email={email}&key={key}&pc={pclass}'
API_ACTION_LIST_SITES = '/list/sitesByCounty?email={email}&key={key}&state={state}&county={county}'

# Monitor actions are requests for monitoring stations that meet specific criteria
API_ACTION_MONITORS_COUNTY = '/monitors/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_MONITORS_BOX = '/monitors/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'


# Summary actions are requests for summary data. These are for daily summaries
API_ACTION_DAILY_SUMMARY_COUNTY = '/dailyData/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_DAILY_SUMMARY_BOX = '/dailyData/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'

# It is always nice to be respectful of a free data resource.
# We're going to observe a 100 requests per minute limit - which is fairly nice
API_LATENCY_ASSUMED = 0.002       # Assuming roughly 2ms latency on the API and network
API_THROTTLE_WAIT = (1.0/100.0)-API_LATENCY_ASSUMED

'''This is a template that covers most of the parameters for the actions we might take, from the set of actions 
above. In the examples below, most of the time parameters can either be supplied as individual values to a
function - or they can be set in a copy of the template and passed in with the template.'''

AQS_REQUEST_TEMPLATE = {
    "email":      "",     
    "key":        "",      
    "state":      "",     # the two digit state FIPS # as a string
    "county":     "",     # the three digit county FIPS # as a string
    "begin_date": "",     # the start of a time window in YYYYMMDD format
    "end_date":   "",     # the end of a time window in YYYYMMDD format, begin_date and end_date must be in the same year
    "minlat":    0.0,
    "maxlat":    0.0,
    "minlon":    0.0,
    "maxlon":    0.0,
    "param":     "",     # a list of comma separated 5 digit codes, max 5 codes requested
    "pclass":    ""      # parameter class is only used by the List calls
}



## Step 0 - Signing Up

Before you can use the API you need to request a key. You will use an email address to make the request. The EPA then sends a confirmation email link and a 'key' that you use for all other requests.

You only need to sign-up once, unless you want to invalidate your current key (by getting a new key) or you lose your key.

In [3]:
'''This implements the sign-up request. The parameters are standardized so that this function definition matches
all of the others. However, the easiest way to call this is to simply call this function with your preferred email address.'''

def request_signup(email_address = None,
                   endpoint_url = API_REQUEST_URL, 
                   endpoint_action = API_ACTION_SIGNUP, 
                   request_template = AQS_REQUEST_TEMPLATE,
                   headers = None):
    
    """
    Sends a signup request to an API with the provided parameters.

    Args:
        email_address (str, optional): The email address for the signup request.
        endpoint_url (str, optional): The base URL of the API.
        endpoint_action (str, optional): The specific action (endpoint) for the signup request.
        request_template (dict, optional): A template containing parameters for the request.
        headers (dict, optional): Additional headers to include in the request.

    Returns:
        dict: The JSON response from the API.

    Raises:
        Exception: If no email address is provided.
    """
    
    
    # Make sure we have a string - if you don't have access to this email addres, things might go badly for you
    if email_address:
        request_template['email'] = email_address        
    if not request_template['email']: 
        raise Exception("Must supply an email address to call 'request_signup()'")
    
    # Compose the signup url - create a request URL by combining the endpoint_url with the parameters for the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
        
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response




In [4]:
# implementation 

# ''' A SIGNUP request is only to be done once, to request a key. A key is sent to that email address and needs to be confirmed with a click through This code should probably be commented out after you've made your key request to make sure you don't accidentally make a new sign-up request '''

#print("Requesting SIGNUP ...")
#response = request_signup("dwmc@uw.edu")
#print(json.dumps(response,indent=4))
#

## Step 1 - Making a formal Request

The next thing is to get information about the different types of air quality monitoring (sensors) and the different places where we might find air quality stations. The monitoring system is complex and changes all the time. The EPA implementation allows an API user to find changes to monitoring sites and sensors by making requests - maybe monthly, or daily. This API approach is probably better than having the EPA publish documentation that may be out of date as soon as it hits a web page. The one problem here is that some of the responses rely on jargon or terms-of-art. That is, one needs to know a bit about the way atmospheric sciece works to understand some of the terms. ... Good thing we can use the web to search for terms we don't know!

In [5]:
# constants 

USERNAME = "shweta97@uw.edu"
APIKEY = "aquacat77"


In [6]:
'''This implements the list request. There are several versions of the list request that only require email and key.
This code sets the default action/requests to list the groups or parameter class descriptors. Having those descriptors 
allows one to request the individual (proprietary) 5 digit codes for individual air quality measures by using the
param request. Some code in later cells will illustrate those requests. '''

def request_list_info(email_address=None, key=None,
                      endpoint_url=API_REQUEST_URL,
                      endpoint_action=API_ACTION_LIST_CLASSES,
                      request_template=AQS_REQUEST_TEMPLATE,
                      headers=None):
    """
    Sends a request to retrieve information about a list of classes from an API.

    Args:
        email_address (str, optional): The email address for the request.
        key (str, optional): The key for the request.
        endpoint_url (str, optional): The base URL of the API.
        endpoint_action (str, optional): The specific action (endpoint) for the request.
        request_template (dict, optional): A template containing parameters for the request.
        headers (dict, optional): Additional headers to include in the request.

    Returns:
        dict: The JSON response from the API.

    Raises:
        Exception: If no email address or key is provided.
    """

    # Make sure we have email and key - at least
    # This prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key

    # For the basic request, we need an email address and a key
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_list_info()'")
    if not request_template['key']:
        raise Exception("Must supply a key to call 'request_list_info()'")

    # Compose the request URL - create a request URL by combining the endpoint_url with the parameters for the request
    request_url = endpoint_url + endpoint_action.format(**request_template)

    # Make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None

    return json_response


We now check a sample of the response from the API to judge the structure of the API. We will be using request_list_info function.

In [7]:
'''The default should get us a list of the various groups or classes of sensors. These classes are user defined names for clustors of
sensors that might be part of a package or default air quality sensing station. We need a class name to start getting down to the
a sensor ID. Each sensor type has an ID number. We'll eventually need those ID numbers to be able to request values that come from
that specific sensor.'''

# static user authentication details
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY

# response from the API
response = request_list_info(request_template=request_data)

# printed the corresponding responses
if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
else:
    print(json.dumps(response,indent=4))


[
    {
        "code": "AIRNOW MAPS",
        "value_represented": "The parameters represented on AirNow maps (88101, 88502, and 44201)"
    },
    {
        "code": "ALL",
        "value_represented": "Select all Parameters Available"
    },
    {
        "code": "AQI POLLUTANTS",
        "value_represented": "Pollutants that have an AQI Defined"
    },
    {
        "code": "CORE_HAPS",
        "value_represented": "Urban Air Toxic Pollutants"
    },
    {
        "code": "CRITERIA",
        "value_represented": "Criteria Pollutants"
    },
    {
        "code": "CSN DART",
        "value_represented": "List of CSN speciation parameters to populate the STI DART tool"
    },
    {
        "code": "FORECAST",
        "value_represented": "Parameters routinely extracted by AirNow (STI)"
    },
    {
        "code": "HAPS",
        "value_represented": "Hazardous Air Pollutants"
    },
    {
        "code": "IMPROVE CARBON",
        "value_represented": "IMPROVE Carbon Parameters"
    }

We're interested in getting to something that might be the Air Quality Index (AQI). You see this reported on the news - often around smog values, but also when there is smoke in the sky. The AQI is a complex measure of different gasses and of the particles in the air (dust, dirt, ash ...).

From the list produced by our 'list/Classes' request above, it looks like there is a class of sensors called "AQI POLLUTANTS". Let's try to get a list of those specific sensors and see what we can get from those.

In [9]:
'''Once we have a list of the classes or groups of possible sensors, we can find the sensor IDs that make up that class (group)
The one that looks to be associated with the Air Quality Index is "AQI POLLUTANTS"
We'll use that to make another list request.'''

AQI_PARAM_CLASS = "AQI POLLUTANTS"


In [10]:
# Structure a request to get the sensor IDs associated with the AQI

# static user authentication details
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY

# here we specify that we want this 'pclass' or parameter classs
request_data['pclass'] = AQI_PARAM_CLASS  

# response from the API
response = request_list_info(request_template=request_data)

# printed the corresponding responses
if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
else:
    print(json.dumps(response,indent=4))


[
    {
        "code": "AIRNOW MAPS",
        "value_represented": "The parameters represented on AirNow maps (88101, 88502, and 44201)"
    },
    {
        "code": "ALL",
        "value_represented": "Select all Parameters Available"
    },
    {
        "code": "AQI POLLUTANTS",
        "value_represented": "Pollutants that have an AQI Defined"
    },
    {
        "code": "CORE_HAPS",
        "value_represented": "Urban Air Toxic Pollutants"
    },
    {
        "code": "CRITERIA",
        "value_represented": "Criteria Pollutants"
    },
    {
        "code": "CSN DART",
        "value_represented": "List of CSN speciation parameters to populate the STI DART tool"
    },
    {
        "code": "FORECAST",
        "value_represented": "Parameters routinely extracted by AirNow (STI)"
    },
    {
        "code": "HAPS",
        "value_represented": "Hazardous Air Pollutants"
    },
    {
        "code": "IMPROVE CARBON",
        "value_represented": "IMPROVE Carbon Parameters"
    }

We should now have (above) a response containing a set of sensor ID numbers. The list should include the sensor numbers as well as a description or name for each sensor.

The EPA AQS API has limits on some call parameters. Specifically, when we request data for sensors we can only specify a maximum of 5 different sensor values to return. This means we cannot get all of the Air Quality Index parameters in one request for data. We have to break it up.

What I did below was to break the request into two logical groups, the AQI sensors that sample gasses and the AQI sensors that sample particles in the air.

In [11]:
'''Given the set of sensor codes, now we can create a parameter list or 'param' value as defined by the AQS API spec.
It turns out that we want all of these measures for AQI, but we need to have two different param constants to get
all seven of the code types. We can only have a max of 5 sensors/values request per param.'''

#   Gaseous AQI pollutants CO, SO2, NO2, and O2
AQI_PARAMS_GASEOUS = "42101,42401,42602,44201"

#   Particulate AQI pollutants PM10, PM2.5, and Acceptable PM2.5
AQI_PARAMS_PARTICULATES = "81102,88101,88502"

Air quality monitoring stations are located all over the US at different locations. We will need some sample locations to experiment with different locations to see what kinds of values come back from different sensor requests.

This list includes the FIPS number for the state and county as a 5 digit string. This format, the 5 digit string, is a 'old' format that is still widely used. There are new codes that may eventually be adopted for the US government information systems. But FIPS is currently what the AQS uses, so that's what is in the list as the constant.

We will be exploring with the assigned city

In [12]:
# We'll use this city location in the examples below.

CITY_LOCATIONS = {
    'bismarck' :        {'city'   : 'Bismarck',
                       'county' : 'Burleigh',
                       'state'  : 'North Dakota',
                       'fips'   : '38015',
                       'latlon' : [46.825905, -100.778275] }, 
}

Given our CITY_LOCATIONS constant we can now find which monitoring locations are nearby. One option is to use the county to define the area we're interest in. You can get the EPA to list their monitoring stations by county. You can also get a set of monitoring stations by using a bounding box of latitude, longitude points. For this example, we'll use the county approach. There is a bounding box example later in this notebook.

In [13]:
# This list request should give us a list of all the monitoring stations in the county specified by the given city selected from the CITY_LOCATIONS dictionary

# static user authentication details
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY

# city related information
request_data['state'] = CITY_LOCATIONS['bismarck']['fips'][:2]   # the first two digits (characters) of FIPS is the state code
request_data['county'] = CITY_LOCATIONS['bismarck']['fips'][2:]  # the last three digits (characters) of FIPS is the county code

# response from the API
response = request_list_info(request_template=request_data, endpoint_action=API_ACTION_LIST_SITES)

# printed the corresponding responses
if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
else:
    print(json.dumps(response,indent=4))


[
    {
        "code": "0001",
        "value_represented": null
    },
    {
        "code": "0002",
        "value_represented": null
    },
    {
        "code": "0003",
        "value_represented": "BISMARCK RESIDENTIAL"
    },
    {
        "code": "1001",
        "value_represented": null
    },
    {
        "code": "1002",
        "value_represented": null
    },
    {
        "code": "1003",
        "value_represented": null
    }
]


The above response gives us a list of monitoring stations. Each monitoring station has a unique "code" which is a string number, and, sometimes, a description. The description seems to be something about where the monitoring station is located. Now that we have extracted all the data for the city and the codes. We will now try to summarize the data so that it can be used for analysis.

## Step 2 - Response Summary 

The function below is designed to encapsulate requests to the EPA AQS API. When calling the function one should create/copy a parameter template, then initialize that template with values that won't change with each call. Then on each call simply pass in the parameters that need to change, like date ranges.

Another function below provides an example of extracting values and restructuring the response to make it a little more usable.

In [8]:
'''This implements the daily summary request. Daily summary provides a daily summary value for each sensor being requested
from the start date to the end date.Like the two other functions, this can be called with a mixture of a defined parameter dictionary,
or with function parameters. If function parameters are provided, those take precedence over any parameters from the request template.'''


def request_daily_summary(email_address=None, key=None, param=None,
                          begin_date=None, end_date=None, fips=None,
                          endpoint_url=API_REQUEST_URL,
                          endpoint_action=API_ACTION_DAILY_SUMMARY_COUNTY,
                          request_template=AQS_REQUEST_TEMPLATE,
                          headers=None):
    """
    Sends a request to retrieve daily summary information for a specific county from an API.

    Args:
        email_address (str, optional): The email address for the request.
        key (str, optional): The key for the request.
        param (str, optional): The parameter value for the request.
        begin_date (str, optional): The start date for the request.
        end_date (str, optional): The end date for the request.
        fips (str, optional): The FIPS code for the county (5-digit code).

        endpoint_url (str, optional): The base URL of the API.
        endpoint_action (str, optional): The specific action (endpoint) for the request.
        request_template (dict, optional): A template containing parameters for the request.
        headers (dict, optional): Additional headers to include in the request.

    Returns:
        dict: The JSON response from the API.

    Raises:
        Exception: If required parameters are missing.
    """

    # Prioritize the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips) == 5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]

    # Make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_daily_summary()'")
    if not request_template['key']:
        raise Exception("Must supply a key to call 'request_daily_summary()'")
    if not request_template['param']:
        raise Exception("Must supply param values to call 'request_daily_summary()'")
    if not request_template['begin_date']:
        raise Exception("Must supply a begin_date to call 'request_daily_summary()'")
    if not request_template['end_date']:
        raise Exception("Must supply an end_date to call 'request_daily_summary()'")

    # Note we're not validating FIPS fields because not all of the daily summary actions require the FIPS numbers

    # Compose the request URL - create a request URL by combining the endpoint_url with the parameters for the request
    request_url = endpoint_url + endpoint_action.format(**request_template)

    # Make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None

    return json_response


Now we request the response for the city. The form of the daily summary response is a bit verbose with lots of repeated values. What we'll do is create a data structure that relies on a hierarchical context to summarize the data.

The response show that not every monitoring site produces values. As well, it looks like the monitoring sites only produce values for particulates and not for gaseous pollutants.

The next function takes the response and a set of fields that should be extracted for their data values. The code assumes those fields are available. If there are missing values something could certainly go wrong. The function creates a summary for each monitoring site.

In [20]:
# static user authentication details
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['param'] = AQI_PARAMS_GASEOUS
request_data['state'] = CITY_LOCATIONS['bismarck']['fips'][:2]
request_data['county'] = CITY_LOCATIONS['bismarck']['fips'][2:]

# request daily summary data for the month of June in 2002
gaseous_aqi = request_daily_summary(request_template=request_data, begin_date="20020601", end_date="20020701")
print("Response for the gaseous pollutants ...")
if gaseous_aqi["Header"][0]['status'] == "Success":
    print(json.dumps(gaseous_aqi['Data'],indent=4))
elif gaseous_aqi["Header"][0]['status'].startswith("No data "):
    print("Looks like the response generated no data. You might take a closer look at your request and the response data.")
else:
    print(json.dumps(gaseous_aqi,indent=4))

Response for the gaseous pollutants ...
Looks like the response generated no data. You might take a closer look at your request and the response data.


In [19]:
# request information for particulate matter
request_data['param'] = AQI_PARAMS_PARTICULATES
# request daily summary data for the month of June in 2020
particulate_aqi = request_daily_summary(request_template=request_data, begin_date="20000101", end_date="20000201")
print("Response for the particulate pollutants ...")

if particulate_aqi["Header"][0]['status'] == "Success":
    print(json.dumps(particulate_aqi['Data'],indent=4))
elif particulate_aqi["Header"][0]['status'].startswith("No data "):
    print("Looks like the response generated no data. You might take a closer look at your request and the response data.")
else:
    print(json.dumps(particulate_aqi,indent=4))

Response for the particulate pollutants ...
[
    {
        "state_code": "38",
        "county_code": "015",
        "site_number": "0003",
        "parameter_code": "88101",
        "poc": 1,
        "latitude": 46.825425,
        "longitude": -100.76821,
        "datum": "NAD83",
        "parameter": "PM2.5 - Local Conditions",
        "sample_duration_code": "7",
        "sample_duration": "24 HOUR",
        "pollutant_standard": "PM25 24-hour 2006",
        "date_local": "2000-01-01",
        "units_of_measure": "Micrograms/cubic meter (LC)",
        "event_type": "No Events",
        "observation_count": 1,
        "observation_percent": 100.0,
        "validity_indicator": "Y",
        "arithmetic_mean": 7.2,
        "first_max_value": 7.2,
        "first_max_hour": 0,
        "aqi": 30,
        "method_code": "118",
        "method": "R & P Model 2025 PM2.5 Sequential w/WINS - GRAVIMETRIC",
        "local_site_name": "BISMARCK RESIDENTIAL",
        "site_address": "1810 N 16TH 

The form of the daily summary response is a bit verbose with lots of repeated values. What we'll do is create a data structure that relies on a hierarchical context to summarize the data.

The responses show that not every monitoring site produces values. As well, it looks like the monitoring sites only produce values for particulates and not for gaseous pollutants.

The next function takes the response and a set of fields that should be extracted for their data values. The code assumes those fields are available. If there are missing values something could certainly go wrong. The function creates a summary for each monitoring site.

In [15]:
# This is a list of field names - data - that will be extracted from each record
EXTRACTION_FIELDS = ['sample_duration','observation_count','arithmetic_mean','aqi']

In [16]:
def extract_summary_from_response(r=None, fields=EXTRACTION_FIELDS):
    """
    Extracts and structures summary records from a given response.

    Parameters:
        r (dict): The response object containing data to be extracted.
        fields (list): List of fields to be extracted for each record.

    Returns:
        dict: A structured summary of the extracted data organized by site, parameter, and date.
              The structure includes site details, pollutant types, parameter information,
              units of measure, methods, and data for each date.
    """
    # Initialize the result dictionary to store the structured summary
    result = dict()
    # Extract the data from the response object
    data = r["Data"]
    
    # Iterate through each record in the data
    for record in data:
        # Extract relevant information from the record
        site = record['site_number']
        param = record['parameter_code']
        # Convert the date to YYYYMMDD format
        date = record['date_local'].replace('-', '')

        # Check if the site is not already in the result dictionary
        if site not in result:
            # If not, create a new entry for the site
            result[site] = dict()
            result[site]['local_site_name'] = record['local_site_name']
            result[site]['site_address'] = record['site_address']
            result[site]['state'] = record['state']
            result[site]['county'] = record['county']
            result[site]['city'] = record['city']
            result[site]['pollutant_type'] = dict()

        # Check if the parameter is not already associated with the site
        if param not in result[site]['pollutant_type']:
            # If not, create a new entry for the parameter
            result[site]['pollutant_type'][param] = dict()
            result[site]['pollutant_type'][param]['parameter_name'] = record['parameter']
            result[site]['pollutant_type'][param]['units_of_measure'] = record['units_of_measure']
            result[site]['pollutant_type'][param]['method'] = record['method']
            result[site]['pollutant_type'][param]['data'] = dict()

        # Check if the date is not already associated with the parameter
        if date not in result[site]['pollutant_type'][param]['data']:
            # If not, create a new entry for the date
            result[site]['pollutant_type'][param]['data'][date] = list()

        # Extract the specified fields for the current record
        extract = dict()
        for k in fields:
            # Check if the field is present in the record
            if str(k) in record:
                # If present, add the field to the extraction dictionary
                extract[str(k)] = record[k]
            else:
                # If not present, set the value to None
                extract[str(k)] = None

        # Add the extraction to the list for the current day
        result[site]['pollutant_type'][param]['data'][date].append(extract)

    # Return the final result
    return result

In [17]:
# gives the summary of the gaseous extractions
extract_gaseous = extract_summary_from_response(gaseous_aqi)
print("Summary of gaseous extraction ...")
print(json.dumps(extract_gaseous,indent=4))

Summary of gaseous extraction ...
{}


In [21]:
# gives the summary of the particulate extractions
extract_particulate = extract_summary_from_response(particulate_aqi)
print("Summary of particulate extraction ...")
print(json.dumps(extract_particulate,indent=4))

Summary of particulate extraction ...
{
    "0003": {
        "local_site_name": "BISMARCK RESIDENTIAL",
        "site_address": "1810 N 16TH STREET",
        "state": "North Dakota",
        "county": "Burleigh",
        "city": "Bismarck",
        "pollutant_type": {
            "88101": {
                "parameter_name": "PM2.5 - Local Conditions",
                "units_of_measure": "Micrograms/cubic meter (LC)",
                "method": "R & P Model 2025 PM2.5 Sequential w/WINS - GRAVIMETRIC",
                "data": {
                    "20000101": [
                        {
                            "sample_duration": "24 HOUR",
                            "observation_count": 1,
                            "arithmetic_mean": 7.2,
                            "aqi": 30
                        },
                        {
                            "sample_duration": "24 HOUR",
                            "observation_count": 1,
                            "arithmetic_mean"

Since we didn't get data relevant to our request period, we are going to test out the Bounding box method.

## Step 3 - Making Request by bounding box

There are some places that don't have monitoring stations. In the EPA FAQ that covers the AQS system, they note that their monitoring covers 2000 of the 3000+ US counties.

The AQS API has a mechanism of requesting data and monitoring stations using a geographic bounding box. The above examples just demonstrated the use of the AQS API for making requests by counties. The examples below illustrate the use of bounding boxes. The example below makes requests to identify monitoring stations within the bounding box. Once you knew you have monitoring stations, then the bounding box could be used in the daily summary requests to get AQS data.

In [22]:
''' These are rough estimates for creating bounding boxes based on a city location
You can find these rough estimates on the USGS website:
https://www.usgs.gov/faqs/how-much-distance-does-a-degree-minute-and-second-cover-your-maps'''

LAT_25MILES = 25.0 * (1.0/69.0)    # This is about 25 miles of latitude in decimal degrees
LON_25MILES = 25.0 * (1.0/54.6)    # This is about 25 miles of longitude in decimal degrees

'''Compute a rough estimates for a bounding box around a given place
The bounding box is scaled in 50 mile increments. That is the bounding box will have sides that
are rough multiples of 50 miles, with the center of the box around the indicated place.
The scale parameter determines the scale (size) of the bounding box'''

def bounding_latlon(place=None,scale=1.0):
    minlat = place['latlon'][0] - float(scale) * LAT_25MILES
    maxlat = place['latlon'][0] + float(scale) * LAT_25MILES
    minlon = place['latlon'][1] - float(scale) * LON_25MILES
    maxlon = place['latlon'][1] + float(scale) * LON_25MILES
    return [minlat,maxlat,minlon,maxlon]



In [23]:
'''This implements the monitors request. This requests monitoring stations. This can be done by state, county, or bounding box. 

Like the two other functions, this can be called with a mixture of a defined parameter dictionary, or with function
parameters. If function parameters are provided, those take precedence over any parameters from the request template.'''

def request_monitors(email_address=None, key=None, param=None,
                     begin_date=None, end_date=None, fips=None,
                     endpoint_url=API_REQUEST_URL,
                     endpoint_action=API_ACTION_MONITORS_COUNTY,
                     request_template=AQS_REQUEST_TEMPLATE,
                     headers=None):

    """
    Requests monitor data based on specified parameters.

    Parameters:
        email_address (str): Email address for API request.
        key (str): API key for authentication.
        param (str): Parameter values for the request.
        begin_date (str): Start date for the data request.
        end_date (str): End date for the data request.
        fips (str): FIPS code for county-level requests.
        endpoint_url (str): Base URL for API requests.
        endpoint_action (str): API action for monitoring county data.
        request_template (dict): Template for constructing the API request.
        headers (dict): Additional headers for the API request.

    Returns:
        dict: JSON response containing monitor data or None if the request fails.
    """

    # Prioritize information from the call parameters over what's in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips) == 5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]

    # Ensure there are values that allow making a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_monitors()'")
    if not request_template['key']:
        raise Exception("Must supply a key to call 'request_monitors()'")
    if not request_template['param']:
        raise Exception("Must supply param values to call 'request_monitors()'")
    if not request_template['begin_date']:
        raise Exception("Must supply a begin_date to call 'request_monitors()'")
    if not request_template['end_date']:
        raise Exception("Must supply an end_date to call 'request_monitors()'")
    # Note: FIPS fields are not validated because not all monitor actions require FIPS numbers

    # Compose the request URL
    request_url = endpoint_url + endpoint_action.format(**request_template)

    # Make the request
    try:
        # Wait first to avoid exceeding rate limits in case of an exception during request processing
        # Throttling is good practice, especially with free data sources
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None

    return json_response

In [25]:
# request information 
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['param'] = AQI_PARAMS_GASEOUS     # remember we have both gaseous and particulates
# 
#   We got the monitoring stations for Bend OR above (Deschutes county) - let's work with that one again
request_data['state'] = CITY_LOCATIONS['bismarck']['fips'][:2]
request_data['county'] = CITY_LOCATIONS['bismarck']['fips'][2:]
#
# the first example uses the default - request monitors by county, we'll just use a recent date for now
response = request_monitors(request_template=request_data, begin_date="20020401", end_date="20020501")
#
# the response should be similar to the 'list' request above - but in this case we should only get monitors that
# monitor the AQI_PARAMS_PARTICULATES set of params.
#
if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
else:
    print(json.dumps(response,indent=4))


{
    "Header": [
        {
            "status": "No data matched your selection",
            "request_time": "2023-11-15T01:41:01-05:00",
            "url": "https://aqs.epa.gov/data/api/monitors/byCounty?email=shweta97@uw.edu&key=aquacat77&param=42101,42401,42602,44201&bdate=20020401&edate=20020501&state=38&county=015",
            "rows": 0
        }
    ],
    "Data": []
}


Their unique site IDs are given in the 'site_number' field of the dictionary. The response also includes their lat,lon positions should we need that. Comparing the site numbers to the response from the list request above - all of the sites here are there. So, things seem to be heading in the right direction.

## Step 4: AQI Data Pull Implementation

Now that we have a bounding box method, we will apply it to all the time frames to get the corresponding responses. We need responses from 1963 to 2023. 

**Note** - Since Gaseous extractions data is very sporadic and not continuous, we will be considering just the particulate contribution to the AQI

In [29]:
# request information for particulate matter
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY

# bounding box implementation
bbox = bounding_latlon(CITY_LOCATIONS['bismarck'], scale=10.0)
request_data['minlat'], request_data['maxlat'], request_data['minlon'], request_data['maxlon'] = bbox

In [30]:
def get_daily_estimate(start_date, end_date):
    """
    Retrieves daily estimates for particulate air quality index (AQI) parameters.

    Parameters:
        start_date (str): Start date for the monthly estimate.
        end_date (str): End date for the monthly estimate.

    Returns:
        dict: Structured summary of monthly estimates for particulate AQI parameters.
              Returns None if no data is received for the specified date range.
    """
    
    # Set the parameter for particulate AQI
    request_data['param'] = AQI_PARAMS_PARTICULATES
    
    # Request daily summary data for particulate AQI within the specified date range
    particulate_aqi = request_daily_summary(request_template=request_data, begin_date=start_date, end_date=end_date, endpoint_action=API_ACTION_DAILY_SUMMARY_BOX)
    
    # Check if data is received
    if particulate_aqi is None:
        print("No data received for the specified date range.")
        return None
    
    # Extract and structure the summary data for particulate AQI
    extract_particulate = extract_summary_from_response(particulate_aqi)
    
    return extract_particulate


In [31]:
def aggregate_aqi(sensor_data, pollutant_id):
    """
    Aggregates Air Quality Index (AQI) data for a specified pollutant across multiple sensors.

    Parameters:
        sensor_data (dict): Dictionary containing sensor data.
        pollutant_id (str): ID of the pollutant for which AQI data is to be aggregated.

    Returns:
        pd.DataFrame: Aggregated monthly AQI data for the specified pollutant.
    """

    # Initialize an empty DataFrame to store aggregated data
    aggregated_data = pd.DataFrame()

    # Loop through each sensor in the data
    for sensor_id, sensor_info in sensor_data.items():
        # Check if the specified pollutant_id exists for the current sensor
        if pollutant_id in sensor_info['pollutant_type']:
            # Get pollutant information for the specified pollutant_id
            pollutant_info = sensor_info['pollutant_type'][pollutant_id]

            # Create a DataFrame from the pollutant data
            # Convert nested JSON to DataFrame
            data = []
            for date, records in pollutant_info['data'].items():
                for record in records:
                    record['date'] = date
                    data.append(record)
            pollutant_df = pd.DataFrame(data)
            
            # Convert the index to a datetime object
            pollutant_df.index = pd.to_datetime(pollutant_df.date, format='%Y%m%d')
            pollutant_df = pollutant_df[['aqi']]

            # Resample data to monthly frequency and calculate the mean
            monthly_aggregated = pollutant_df.resample('M').mean()

    return monthly_aggregated


In [32]:
def get_first_and_last_days(year):
    """
    Returns lists of the first and last days of each month in the specified year.

    Parameters:
        year (int): The year for which to retrieve the first and last days.

    Returns:
        list, list: Lists of strings representing the first and last days of each month.
    """

    first_days = []
    last_days = []

    for month in range(1, 13):
        # Get the first day of the month
        first_day = datetime(year, month, 1)

        # Calculate the last day of the month
        if month == 12:
            last_day = datetime(year + 1, 1, 1) - timedelta(days=1)
        else:
            last_day = datetime(year, month + 1, 1) - timedelta(days=1)

        first_days.append(str(first_day))
        last_days.append(str(last_day))
        
    # Extract only the date part and convert it to the specified format (YYYYMMDD)
    first_days = [i.split(' ')[0].replace('-', '') for i in first_days]
    last_days = [i.split(' ')[0].replace('-', '') for i in last_days]

    return first_days, last_days


In [35]:
# Initialize an empty DataFrame to store the final results
final_df = pd.DataFrame()

# Loop through each year in the specified range (1963, 2023)
for i in range(1963, 2024):
    # Get the first and last days of each month for the current year
    first_days, last_days = get_first_and_last_days(i)  
    
    # Loop through each month (0 to 11)
    for j in range(0, 12):
        # Get daily estimates for the specified date range
        sensor_data = get_daily_estimate(first_days[j], last_days[j])

        try:
            # Loop through each pollutant ID and aggregate AQI data
            for pollutant_id in ['81102', '88101', '88502']:
                if pollutant_id == '81102':
                    # For the first pollutant, directly aggregate AQI data
                    merged_df = aggregate_aqi(sensor_data, pollutant_id).reset_index()
                    merged_df['pollutant_id'] = pollutant_id
                    final_df = pd.concat([final_df, merged_df], ignore_index=True, sort=False)
                else:
                    # For subsequent pollutants, merge with the existing DataFrame and calculate average
                    monthly_aggregated_data = aggregate_aqi(sensor_data, pollutant_id)
                    merged_df = pd.merge(merged_df, monthly_aggregated_data, on='date')
                    merged_df['aqi'] = (merged_df['aqi_x'] + merged_df['aqi_y']) / 2
                    merged_df = merged_df[['date', 'aqi']]
                    merged_df['pollutant_id'] = pollutant_id
                    final_df = pd.concat([final_df, merged_df], ignore_index=True, sort=False)
            
            # Aggregate final results by taking the mean of AQI for each date
            final_df = pd.DataFrame(final_df.groupby('date')['aqi'].mean()).reset_index()    
            print('Data processed for the month', (j + 1), 'of the year', i)    
        except:
            print('Data not processed for the month', (j + 1), 'of the year', i)
            continue


Data not processed for the month 1 of the year 1998
Data not processed for the month 2 of the year 1998
Data not processed for the month 3 of the year 1998
Data not processed for the month 4 of the year 1998
Data not processed for the month 5 of the year 1998
Data not processed for the month 6 of the year 1998
Data not processed for the month 7 of the year 1998
Data not processed for the month 8 of the year 1998
Data not processed for the month 9 of the year 1998
Data not processed for the month 10 of the year 1998
Data not processed for the month 11 of the year 1998
Data not processed for the month 12 of the year 1998
Data processed for the month 1 of the year 1999
Data processed for the month 2 of the year 1999
Data processed for the month 3 of the year 1999
Data processed for the month 4 of the year 1999
Data processed for the month 5 of the year 1999
Data processed for the month 6 of the year 1999
Data processed for the month 7 of the year 1999
Data processed for the month 8 of the

Data processed for the month 2 of the year 2012
Data processed for the month 3 of the year 2012
Data processed for the month 4 of the year 2012
Data processed for the month 5 of the year 2012
Data processed for the month 6 of the year 2012
Data processed for the month 7 of the year 2012
Data processed for the month 8 of the year 2012
Data processed for the month 9 of the year 2012
Data processed for the month 10 of the year 2012
Data processed for the month 11 of the year 2012
Data processed for the month 12 of the year 2012
Data processed for the month 1 of the year 2013
Data processed for the month 2 of the year 2013
Data processed for the month 3 of the year 2013
Data processed for the month 4 of the year 2013
Data processed for the month 5 of the year 2013
Data processed for the month 6 of the year 2013
Data processed for the month 7 of the year 2013
Data processed for the month 8 of the year 2013
Data processed for the month 9 of the year 2013
Data processed for the month 10 of th

In [36]:
# Final AQI Estimates from 1963 to 2024
final_df

Unnamed: 0,date,aqi,pollutant_id
0,1998-01-31,5.000000,
1,1998-02-28,6.666667,
2,1998-03-31,6.800000,
3,1998-04-30,7.600000,
4,1998-05-31,12.600000,
...,...,...,...
311,2023-07-31,41.599846,88101
312,2023-08-31,31.193548,81102
313,2023-08-31,40.996774,88101
314,2023-09-30,37.066667,81102


In [38]:
final_df.to_csv('AQI_DataPull_1963_2023_monthly.csv')