# Process downloaded PurpleAir files

Processes hourly raw data files from PurpleAir (downloaded using the collect_PurpleAir_rawdata_thingspeak.ipynb notebook). Performs QA/QC and applies EPA smoke correction.

### Load python packages

In [1]:
import numpy as np
import pandas as pd
from pandas.tseries.offsets import DateOffset
import matplotlib.pyplot as plt
import datetime
from pathlib import Path                   # System agnostic paths

import requests # for url requests
from statistics import median # for median of array
import json # for json reading/writing
import time # for epoch timestamp
import csv # for writing csv files
import glob # for listing files

### Create reusable functions

In [2]:
def remove_frozen_sensor(dataframe):
    
    '''
    Removes data where one sensor consistently reads zero.
    Currently adds NaN any time there is a zero in one of the sensors.
    
    dataframe: pandas dataframe
    
    returns: pandas dataframe QA
    '''
    
    # Load the PM2.5 values
    PM25_Ah = dataframe['PM2.5 (CF=1)'].copy()
    PM25_Bh = dataframe['PM2.5_b (CF=1)'].copy()
    
    #check ATM vs CF labeling
    PM25_A_atm = dataframe['PM2.5 (ATM)'].copy()
    PM25_B_atm = dataframe['PM2.5_b (ATM)'].copy()
    
    # Turn zero values into NaNs
    idx1 = dataframe[PM25_Ah == 0].index
    dataframe.loc[idx1] = np.nan
    idx2 = dataframe[PM25_A_atm == 0].index
    dataframe.loc[idx2] = np.nan
    idx3 = dataframe[PM25_Bh == 0 ].index
    dataframe.loc[idx3, dataframe.columns != 'date' ] = np.nan
    idx4 = dataframe[PM25_B_atm == 0 ].index
    dataframe.loc[idx4, dataframe.columns != 'date' ] = np.nan

    return dataframe

In [3]:
def correct_smoke_PA(PM25_Ah,PM25_Bh,humidity):
    
    '''
    Cleans (QA/QC) and applies EPA smoke correction to PurpleAir PM2.5 data.
    
    
    '''
    
    #clean data
    avgAB = (PM25_Ah+PM25_Bh)/2
    diffAB = abs(PM25_Ah-PM25_Bh)
    rel_diffAB = diffAB/avgAB
    # Keep the values where diffAB < 5 μg m-3 or 
    # relative diff < 0.7 μg m-3
    # i.e. both have to be large to be removed
    # Create average of two measurement values
    avgAB = avgAB.where((diffAB < 5) | (rel_diffAB < 0.7))
    #smoke correction
    PM2_5corr = 0.534*(avgAB)- 0.0844*humidity + 5.604
    
    return PM2_5corr

In [4]:
def convert_AQI(PM_data):   
    
    '''
    '''
    
    final_aqi = PM_data.copy()
    # mask changes everything that does not satisfy the condition
    # for example the first line below leaves everything where 
    # final data is greater than 12, but replaces 12 or less with 1.
    final_aqi = final_aqi.where(final_aqi>12.1 ,1)
    final_aqi = final_aqi.where((final_aqi<12.1) | (final_aqi> 35.5),2)
    final_aqi = final_aqi.where((final_aqi<35.5) | (final_aqi> 55.5),3)
    final_aqi = final_aqi.where((final_aqi<55.5) | (final_aqi> 150.5),4)
    final_aqi = final_aqi.where((final_aqi<150.5) | (final_aqi> 250),5)
    final_aqi = final_aqi.where(final_aqi<250.0,6)
    # fill in all the NaN values again
    final_aqi = final_aqi.where(PM_data.notna() , np.nan)


    return final_aqi


In [5]:
def write_processed(file,PA_data,check_label):  
    
    '''
    
    '''
    
    station_meta = file.split('_')
    ID = station_meta[-4]
    name = station_meta[-3]
    lat = station_meta[-2]
    lon = station_meta[-1].split('.csv')[0]
    print(name, lat, lon)
    if any(check_label == 'switched'):
        print('Switched CF and ATM')
        csv_name = 'PurpleAir_processed_{}_{}_{}_{}_switched.csv'.format(ID, name, lat, lon)
    else:
        print('correctly labeled')
        csv_name = 'PurpleAir_processed_{}_{}_{}_{}.csv'.format(ID, name, lat, lon)
    print(csv_name)
    # Round for writing out
    PA_data = PA_data.round(decimals=4)
    PA_data.to_csv('example_data/processed_hourly_sensor_data_2020/{}'.format(csv_name), index=False,  na_rep='NaN')

### Post-process PM2.5

Load csv files that were downloaded from PurpleAir, check identity of CF and ATM channels, clean data and average channels, correct for smoke using RH.

In [6]:
files = glob.glob("example_data/raw_hourly_sensor_data_2020/*.csv")

#for file in files[2:3]:
for file in files:
    print(file)
    load_data = pd.read_csv(file, na_values='NaN', keep_default_na=False)

    # Remove duplicate dates if present
    # Currently keeps first entry - need more sophisticated to check for NaN
    # ignore_index=True re-indexes the resulting array
    PA_load = load_data.drop_duplicates(subset='date (UTC)', keep='first', inplace=False, ignore_index=True)
    
    # Process
    # 1. Zero -> NaN
    PA_data = remove_frozen_sensor(PA_load)
    
    # 2. Check correct identity
    
    #check ATM vs CF labeling
    # https://rdrr.io/github/MazamaScience/AirSensor/f/documents/PurpleAir_CF%3DATM_vs_CF%3D1.md
    # PurpleAir mistakenly labeled the lower readings CF=1 and the higher set of numbers CF=ATM.
    # All three readings for PM1, PM2.5 and PM10 were swapped. This bug was present from day one
    # and did not change during the firmware updates all the way up to version 4.11.
    PM25_A_cf = PA_data['PM2.5 (CF=1)'].copy()
    PM25_B_cf = PA_data['PM2.5_b (CF=1)'].copy()
    PM25_A_atm = PA_data['PM2.5 (ATM)'].copy()
    PM25_B_atm = PA_data['PM2.5_b (ATM)'].copy()
    
    # want to satisfy the definitions
    #     PM2.5CF_A_h > 40 (μg m-3)
    # AND
    #     PM2.5CF_A_h > PM2.5atm_A_h
    
    check_label = PM25_A_cf.where((PM25_A_cf.isna()) | (PM25_A_cf > 40), 'lt 40')
        
    for i in range(len(check_label)):
        if check_label[i] == 'lt 40':
            continue
        else:
            if PM25_A_cf[i] <= PM25_A_atm[i]:
                #print('CF',PM25_A_cf[i])
                #print('ATM',PM25_A_atm[i])
                check_label[i] = 'keep'
            elif PM25_A_cf[i] > PM25_A_atm[i]:
                check_label[i] = 'switched'
        #print(PA_data['date (UTC)'][i], check_label[i])
        
    if any(check_label == 'switched'):
        print('Need to switch CF and ATM')
        PA_data['PM2.5 (CF=1)'] = PM25_A_atm
        PA_data['PM2.5_b (CF=1)'] = PM25_B_atm
        PA_data['PM2.5 (ATM)'] = PM25_A_cf
        PA_data['PM2.5_b (ATM)'] = PM25_B_cf
    else:
        print('correctly labeled')

    PA_data['check_label'] = check_label
    PA_data.index = PA_data['date (UTC)']
    
    # 3. Correct for smoke measurements
    PM25_Ah = PA_data['PM2.5 (CF=1)'].copy()
    PM25_Bh = PA_data['PM2.5_b (CF=1)'].copy()
    
    PA_data['PM2.5_corrected'] = correct_smoke_PA(PM25_Ah,PM25_Bh,PA_data['Humidity'])
    test = correct_smoke_PA(PM25_Ah,PM25_Bh,PA_data['Humidity'])
    
    # 4. Calculate AQI
    PA_data['AQI'] = convert_AQI(PA_data['PM2.5_corrected'])
    
    #5 Write processed data files
    write_processed(file,PA_data,check_label)
    
    
#PA_data

example_data/raw_hourly_sensor_data_2020/PurpleAir_3848_MARINCC_38.259796_-122.63819.csv
correctly labeled
MARINCC 38.259796 -122.63819
correctly labeled
PurpleAir_processed_3848_MARINCC_38.259796_-122.63819.csv
example_data/raw_hourly_sensor_data_2020/PurpleAir_3631_MeadowCt_38.407654_-122.83102.csv
correctly labeled
MeadowCt 38.407654 -122.83102
correctly labeled
PurpleAir_processed_3631_MeadowCt_38.407654_-122.83102.csv
example_data/raw_hourly_sensor_data_2020/PurpleAir_3247_Imola-2_38.28229_-122.25759.csv
correctly labeled
Imola-2 38.28229 -122.25759
correctly labeled
PurpleAir_processed_3247_Imola-2_38.28229_-122.25759.csv
example_data/raw_hourly_sensor_data_2020/PurpleAir_3318_VillageEastDrive_38.247944_-122.60063.csv
correctly labeled
VillageEastDrive 38.247944 -122.60063
correctly labeled
PurpleAir_processed_3318_VillageEastDrive_38.247944_-122.60063.csv
example_data/raw_hourly_sensor_data_2020/PurpleAir_3184_Imola6_38.3051_-122.288994.csv
correctly labeled
Imola6 38.3051 -122.

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(loc, value, pi)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(loc, value, pi)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(loc, value, pi)
A value is trying to be set on a copy of a slice from a DataFrame.
Try us