# Pipeline Draft
Pipeline functions to ingest, clean, interpolate, and normalize air pollutant concentration data. 

In [10]:
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
import scipy as sp
import numpy as np
import math
import os

AQ_FILE = "Measurement_summary.csv"
WX_FILE = "wx_file"
DIRI = %pwd

## Data Validation & Transformation Pipeline
##### Takes file path, returns a dictionary with normalized and interpolated data weather station 

@param file: file name, defaults to above specified FILE  
@param diri: directory with data file, defaults to above specified DIRI  
@param compare: bool for comparing against a baseline set of stats, defaults False  
@param verbose: bool for printing statistics, defaults False  
@param interp: specify type of interpolation for missing values, defaults to akima

@return station_dataframes: a dictionary with dataframes, key is TOOL_ID  

##### Outline
- Read data into CSV
- Print Basic Statistics: mean, standard deviation, and skew by each machine
- Data Characteristics: number outliers, number Nan values
- Comparison: compare against saved characteristics of training data
- Outliers: mask outliers (specified # std devs away from mean) with Nan, interpolate using an akima spline

In [2]:
# File that takes Xarray weather data as input and returns usable dataframe
# Set variables to true if they need to be included in dataframe
def xarray_conversion(dset, radiation=True, temperature=True, wind=False, pressure=False, precipitation=False):
    # Column names in NetCFD
    LATITUDE_NAME = "latitude"
    LONGITUDE_NAME = "longitude"
    TIME_NAME = "time"
    TEMP_NAME = "t2m"
    RAD_NAME = "ssrd"
    PRESSURE_NAME = "sp"
    PRECIP_NAME = "tp"
    U_WIND_NAME = "10u"
    V_WIND_NAME = "10v"

    target_lat = dset[LATITUDE_NAME].median()
    target_lon = dset[LONGITUDE_NAME].median()

    dataset_lats = dset[LATITUDE_NAME]
    dataset_lons = dset[LONGITUDE_NAME]

    lat = dset[LATITUDE_NAME][(abs(dataset_lats - target_lat)).argmin()]
    lon = dset[LONGITUDE_NAME][(abs(dataset_lons - target_lon)).argmin()]

    wx_df = pd.DataFrame()

    # Radiation reported in Joules/m^2/hour --> divide by seconds/hour to get Watts/m^2 instantaneous
    wx_df[TIME_NAME] = dset[TIME_NAME]
    if radiation:
        wx_df[RAD_NAME] =  dset[RAD_NAME].sel(latitude=lat, longitude=lon) / 3600
    if temperature:
        wx_df[TEMP_NAME] = dset[TEMP_NAME].sel(latitude=lat, longitude=lon)
    if pressure:
        wx_df[PRESSURE_NAME] = dset[PRESSURE_NAME].sel(latitude=lat, longitude=lon)
    if precipitation:
        wx_df[PRECIP_NAME] = dset[PRECIP_NAME].sel(latitude=lat, longitude=lon)
    if wind:
        u_wind = dset[U_WIND_NAME].sel(latitude=lat, longitude=lon)
        v_wind = dset[V_WIND_NAME].sel(latitude=lat, longitude=lon)
        wx_df["wind"] = np.sqrt(u_wind**2 + v_wind**2)

    return wx_df



In [3]:
def outlier_count(x, sigma=3):
    stdev = x.std()
    upper = x.mean() + sigma*stdev
    lower = x.mean() - sigma*stdev

    return len(x[(x > upper) | (x < lower)])


def null_count(x):
    return x.isnull().sum()

def zscore(x):
    return (x - x.mean())/x.std()

def iqr(x):
    return x.quantile(0.75) - x.quantile(0.25)

def outlier_mask(x, sigma=3):
    stdev = x.std()
    upper = x.mean() + sigma*stdev
    lower = x.mean() - sigma*stdev

    return x.mask(((x > upper) | (x < lower)), np.nan).copy()

In [8]:
aq_path = os.path.join("..", "raw_data", AQ_FILE)
df = pd.read_csv(aq_path, na_values=-1)
df.drop("Address", axis=1, inplace=True)
df

Unnamed: 0,Measurement date,Station code,Latitude,Longitude,SO2,NO2,O3,CO,PM10,PM2.5
0,2017-01-01 00:00,101,37.572016,127.005008,0.004,0.059,0.002,1.2,73.0,57.0
1,2017-01-01 01:00,101,37.572016,127.005008,0.004,0.058,0.002,1.2,71.0,59.0
2,2017-01-01 02:00,101,37.572016,127.005008,0.004,0.056,0.002,1.2,70.0,59.0
3,2017-01-01 03:00,101,37.572016,127.005008,0.004,0.056,0.002,1.2,70.0,58.0
4,2017-01-01 04:00,101,37.572016,127.005008,0.003,0.051,0.002,1.2,69.0,61.0
...,...,...,...,...,...,...,...,...,...,...
647506,2019-12-31 19:00,125,37.544962,127.136792,0.003,0.028,0.013,0.5,23.0,17.0
647507,2019-12-31 20:00,125,37.544962,127.136792,0.003,0.025,0.015,0.4,25.0,19.0
647508,2019-12-31 21:00,125,37.544962,127.136792,0.003,0.023,0.015,0.4,24.0,17.0
647509,2019-12-31 22:00,125,37.544962,127.136792,0.003,0.040,0.004,0.5,25.0,18.0


In [9]:
df.isna().sum()

Measurement date       0
Station code           0
Latitude               0
Longitude              0
SO2                 3976
NO2                 3834
O3                  4059
CO                  4036
PM10                3962
PM2.5               3973
dtype: int64

In [None]:
def read_validate_transform_pipeline(aq_file=AQ_FILE, wx_file=WX_FILE,
 diri=DIRI, verbose=True, interp="akima"):
    aq_file_path = os.path.join(diri, aq_file)
    wx_file_path = os.path.join(diri, wx_file)

    aq_df = pd.read_csv(aq_file_path)
    wx_xr = xr.open_dataset(wx_file_path)
    wx_df = xarray_conversion(wx_xr)

    # Replace TOOL_ID numbers with more friendly integers
    station_ids = aq_df["Station code"].unique()
    aq_df.replace(station_ids, range(len(station_ids)), inplace=True)

    # All data is from SP5 in this dataset, remove column to strealine dataframe
    aq_df.drop("Address" axis=1, errors="ignore", inplace=True)

    # Store variable statistics in a dictionary
    # Key: (Variable, Machine ID, Statistic) == number
    # Statistics = [Mean, Standard Deviation, Skew Coefficient,
    #               IQR, Outlier Count, Null Count, 
    #               Length of Dataset, Minimum, Maximum]

    variables = ['SO2', 'NO2', 'O3', 'CO', 'PM10', 'PM2.5']

    for var in variables:
        aq_df.groupby("Station code")[var].agg([np.mean, np.std,
     sp.stats.skew, iqr, outlier_count, null_count, len, np.min, np.max])
     
    if verbose:
        print(stats)

    stats.to_csv(f"Output/{file}_STATISTICS.csv")

    '''
    if compare:
        compare input statistics against statistics of baseline/training data
    
    '''
    # Calculate Z-Scores for Variable values with statistics from same machine
    df["z"] = df.groupby(["variable", "id"])["value"].transform(zscore)

    # Convert input's time to a pandas datetime
    # df["datetime"] = pd.to_datetime(df["time"])
    df["date"] = pd.to_datetime(df["time"]).dt.date

    # Transform data to daily median values
    df_daily_median = pd.DataFrame(df.groupby(["date", "variable",
     "id"])["z"].agg(np.median))

    df_daily_median = df_daily_median.reset_index(level="variable")
    df_daily_median = df_daily_median.pivot(columns="variable").reset_index()

    # Adjusting columns after pivoting
    df_daily_median = df_daily_median.transpose().reset_index(level=0, drop=True)
    df_daily_median.reset_index(inplace=True)
    df_daily_median["variable"][0] = "date"
    df_daily_median["variable"][1] = "id"

    df_daily_median = df_daily_median.transpose()
    df_daily_median.columns = df_daily_median.iloc[0]
    df_daily_median.drop("variable", axis=0, inplace=True)
    df_daily_median.drop("alias", axis=1, inplace=True)

    # Date column may have change types from transposing
    # Reset as index for akima spline
    df_daily_median["date"] = pd.to_datetime(df_daily_median["date"])
    df_daily_median = df_daily_median.set_index("date")

    # All data represent as objects after transposing
    # Reset to float values
    for var in df_daily_median.columns:
        df_daily_median[var] = pd.to_numeric(df_daily_median[var],errors = 'coerce')

    # Remove "target" variables
    target_cols = [col for col in df_daily_median.columns if 'target' in col]
    df_daily_median.drop(target_cols, axis=1, inplace=True)

    # key = id
    machine_dataframes = {}
    for machine_id in range(len(machine_ids)):
        temp_df = df_daily_median.loc[
            df_daily_median["id"] == machine_id]

        # Set all outliers to Nan values
        outlier_df = outlier_mask(temp_df)

        # Interpolate Nan values using akima spline
        machine_dataframes[machine_id] = outlier_df.interpolate(method=interp)
        # machine_dataframes[machine_id] = outlier_df

    return machine_dataframes