In [37]:
import sys
from openaq import OpenAQ
import time
import requests
import pandas as pd
from datetime import timedelta
from pylibs.utils import get_dates
from tqdm import tqdm
import numpy as np
from joblib import Parallel, delayed
from httpx import ReadTimeout
from collections import defaultdict

In [2]:
period_sdate = '2018010100'
period_edate = '2022123118'
cycle_interv = 6
window_length = 1
halfwindow = timedelta(hours=window_length/2)

In [3]:
cycle_dates = get_dates(period_sdate, period_edate, cycle_interv)
period_start = pd.to_datetime(period_sdate, format='%Y%m%d%H').tz_localize('UTC') - halfwindow
period_end = pd.to_datetime(period_edate, format='%Y%m%d%H').tz_localize('UTC') + halfwindow

In [4]:
openaq_api_key = '68e3a778022b500e2a88fe70a39bfb33979fcb081b2566788ef8da93af82bea8'
client = OpenAQ(api_key=openaq_api_key)

In [5]:
# Get the latest location list with PM2.5
locations = []
p = 1
while True:
    sys.stdout.write(f"\r Page: {p}")
    sys.stdout.flush()
    response = client.locations.list(parameters_id=2, limit=1000, page=p)
    locations += response.results

    if response.headers.x_ratelimit_remaining == 0:
        slp_secs = response.headers.x_ratelimit_reset + 1
        print(f"Wait {slp_secs} seconds for limit reset")
        time.sleep(slp_secs)

    if response.meta.found == '>1000':
        p += 1
    else:
        break

 Page: 16

In [6]:
def keep_available_locations(location, period_start, period_end):
    if location.datetime_first is None or location.datetime_last is None:
        return
    else:
        name = location.name
        id = location.id
        lat = location.coordinates.latitude
        lon = location.coordinates.longitude
        loc_dt_start = pd.to_datetime(location.datetime_first.utc)
        loc_dt_end = pd.to_datetime(location.datetime_last.utc)
        overlapped = (period_end > loc_dt_start) | (loc_dt_end > period_start)
        if overlapped:
            for sensor in location.sensors:
                if 'pm25' in sensor.name:
                    sensor_id = sensor.id
                else:
                    return
    return pd.DataFrame({
        'location': [name],
        'locationID': [id],
        'lat': [lat],
        'lon': [lon],
        'sensorID': [sensor_id],
    })

In [None]:
# results = Parallel(n_jobs=4)(delayed(keep_available_locations)(location, period_start, period_end) for location in tqdm(locations))

In [7]:
results = [keep_available_locations(location, period_start, period_end) for location in tqdm(locations)]
locations_df = pd.concat(results).reset_index(drop=True)
locations_df

100%|██████████| 15323/15323 [00:07<00:00, 2019.22it/s]


Unnamed: 0,location,locationID,lat,lon,sensorID
0,Beijing US Embassy,21,39.950000,116.470000,40
1,Shenyang,53,41.780000,123.420000,5079217
2,Maastricht-A2 Kasteel Hillenraadweg,99,50.859800,5.713810,161
3,Alto Hospicio,139,-20.259620,-70.088654,215
4,Alto Hospicio,140,-20.290859,-70.099554,216
...,...,...,...,...,...
2490,Topanga Charter ES,5364242,34.092280,-118.604180,13812078
2491,Westwood Charter ES,5364243,34.047660,-118.430490,13812076
2492,Reseda Colocation,5364244,34.199170,-118.532780,13812077
2493,Temporary PM2.5,5367443,35.190600,-120.457300,13814542


In [12]:
def setup_windows(dates, halfwindow):
    beg = dates - halfwindow
    end = dates + halfwindow
    return zip(beg, end)

In [16]:
def retrieve_measurements(locations_df, window_beg, window_end):
    df = pd.DataFrame()
    for row in locations_df.itertuples():
        sub_df = pd.DataFrame()
        dt_list = []
        vallist = []
        p = 1
        measurements = []
        while True:
            sys.stdout.write(f"\r Processing Index: {row.Index}, Location: {row.location}, SensorID: {row.sensorID}, page: {p}")
            sys.stdout.flush()
            for attempt in range(max_retries):
                try:
                    response = client.measurements.list(sensors_id=row.sensorID, datetime_from=window_beg, datetime_to=window_end, page=p, limit=1000)
                    measurements += response.results
                    break
                except ReadTimeout:
                    if attempt < max_retries - 1:
                        sys.stdout.write(f"\r ReadTimeout, retrying in {retry_wait} seconds... (attempt {attempt + 1}/{max_retries})")
                        sys.stdout.flush()
                        time.sleep(retry_wait)
                    else:
                        sys.stdout.write("\r Failed after maximum retries.")
                        sys.stdout.flush()
                        raise


            if response.headers.x_ratelimit_remaining == 0:
                slp_secs = measurements.headers.x_ratelimit_reset + 5
                sys.stdout.write(f"\r Wait {slp_secs} seconds for limit reset")
                sys.stdout.flush()
                time.sleep(slp_secs)

            if response.meta.found == '>1000':
                p += 1
            else:
                break

        if len(measurements) != 0:
            tmpdf = pd.DataFrame()
            for measurement in measurements:
                dt_list.append(pd.to_datetime(measurement.period.datetime_to.utc).tz_convert(None))
                vallist.append(measurement.value)

            tmpdf['dateTime'] = dt_list
            tmpdf['pm25'] = vallist
            tmpdf['location'] = row.location
            tmpdf['locationID'] = row.locationID
            tmpdf['sensorID'] = row.sensorID
            tmpdf['latitude'] = row.lat
            tmpdf['longitude'] = row.lon
            sub_df = pd.concat((sub_df, tmpdf))
        if not sub_df.empty:
            df = pd.concat((df, sub_df))

    return df

In [31]:
response = client.measurements.list(sensors_id=row[4], datetime_from=period_start, datetime_to=period_end, page=14, limit=1000)

ReadTimeout: The read operation timed out

In [32]:
def writeout_measurements_df(measurements):
    if len(measurements) != 0:
        tmpdf = pd.DataFrame()
        for measurement in measurements:
            dt_list.append(pd.to_datetime(measurement.period.datetime_to.utc).tz_convert(None))
            vallist.append(measurement.value)

        tmpdf['dateTime'] = dt_list
        tmpdf['pm25'] = vallist
        tmpdf['location'] = row[0]
        tmpdf['locationID'] = row[1]
        tmpdf['sensorID'] = row[4]
        tmpdf['latitude'] = row[2]
        tmpdf['longitude'] = row[3]
    return tmpdf

In [35]:
row = tuple(locations_df.iloc[0])
sub_df = pd.DataFrame()
dt_list = []
vallist = []
p = 1
measurements = []
while True:
    sys.stdout.write(f"\r Processing Index: 0, Location: {row[0]}, SensorID: {row[4]}, page: {p}")
    sys.stdout.flush()
    try:
        response = client.measurements.list(sensors_id=row[4], datetime_from=period_start, datetime_to=period_end, page=p, limit=1000)
        measurements += response.results
    except ReadTimeout:
        tmpdf = writeout_measurements_df(measurements)
        measurements = []
        time.sleep(60)

    if response.headers.x_ratelimit_remaining == 0:
        slp_secs = measurements.headers.x_ratelimit_reset + 5
        sys.stdout.write(f"\r Wait {slp_secs} seconds for limit reset                      ")
        sys.stdout.flush()
        time.sleep(slp_secs)

    if response.meta.found == '>1000':
        p += 1
    else:
        break

    tmpdf = writeout_measurements_df(measurements)
    sub_df = pd.concat((sub_df, tmpdf))

 Processing Index: 0, Location: Beijing US Embassy, SensorID: 40, page: 12

UnboundLocalError: cannot access local variable 'tmpdf' where it is not associated with a value

In [34]:
sub_df

In [13]:
measurements_df = retrieve_measurements(locations_df, period_start, period_end)

 Process Index: 0, Location: Beijing US Embassy

ReadTimeout: The read operation timed out

In [25]:
measurements_df

Unnamed: 0,dateTime,pm25,location,locationID,sensorID,latitude,longitude
0,2017-12-31 23:00:00,16.0,Beijing US Embassy,21,40,39.9500,116.4700
1,2018-01-01 01:00:00,11.0,Beijing US Embassy,21,40,39.9500,116.4700
2,2018-01-01 06:00:00,24.0,Beijing US Embassy,21,40,39.9500,116.4700
3,2018-01-01 07:00:00,35.0,Beijing US Embassy,21,40,39.9500,116.4700
4,2018-01-01 08:00:00,-15.0,Beijing US Embassy,21,40,39.9500,116.4700
...,...,...,...,...,...,...,...
300,2018-01-22 22:00:00,2.1,SPARTAN - Sherbrooke,1285354,6566241,45.3798,-71.9313
301,2018-01-22 23:00:00,2.0,SPARTAN - Sherbrooke,1285354,6566241,45.3798,-71.9313
302,2018-01-23 00:00:00,1.9,SPARTAN - Sherbrooke,1285354,6566241,45.3798,-71.9313
303,2018-01-23 01:00:00,2.2,SPARTAN - Sherbrooke,1285354,6566241,45.3798,-71.9313
