## Data Preparation

This notebook contains the data preparation steps tested in the EDA to transform the data to a usable time series

In [1]:
import pandas as pd
import numpy as np
from scipy.stats import norm, expon, lognorm, rayleigh
from scipy.stats import trim_mean
import warnings
warnings.filterwarnings('ignore')

In [2]:
#Reported data from Smell Pithsburgh habitants
reports_data = pd.read_csv("../Data/dataset/v2.1/smell_raw.csv")

def windowed_timeseries(data, window_size = 5):
    window_data = []
    end = data['EpochTime'].max()
    start = data['EpochTime'].min()
    
    while start < end:
        window_data.append(create_window_row(start))
        start += window_size * 3600
    
    return pd.DataFrame(window_data)

def create_window_row(epoch_start: int, agg_size = 24):
    epoch_finish = epoch_start + agg_size * 3600
    
    dict = {
        "EpochStart" : epoch_start,
        "EpochEnd"  : epoch_finish,
    }

    return dict

ts_data = windowed_timeseries(reports_data)
ts_data

Unnamed: 0,EpochStart,EpochEnd
0,1477935134,1478021534
1,1477953134,1478039534
2,1477971134,1478057534
3,1477989134,1478075534
4,1478007134,1478093534
...,...,...
10706,1670643134,1670729534
10707,1670661134,1670747534
10708,1670679134,1670765534
10709,1670697134,1670783534


In [3]:
""" List of classes that create the features """

class IntensityMetric():
    def __init__(self, reports_data):
        self.data = reports_data
        self.sample = np.array([])
    
    def get_intensity(self, epoch_start, epoch_end):
        filtered = self.data.loc[(self.data['EpochTime'] >= epoch_start) & (self.data['EpochTime'] < epoch_end)]['smell_value']
        intensity_metric = filtered.sum()
        self.sample = np.append(self.sample, intensity_metric)
        return intensity_metric
    
    def create_distr(self):
        self.mean = np.mean(self.sample[~np.isnan(self.sample)])
        self.std =  np.std(self.sample[~np.isnan(self.sample)])
    
    def standarise(self, value):
        return (value - self.mean)/self.std


class DispersionMetric():
    def __init__(self, reports_data):
        self.data = reports_data
        self.__sample = np.array([])
    
    def get_dispersion(self, epoch_start, epoch_end):
        filtered = self.data.loc[(self.data['EpochTime'] >= epoch_start) & (self.data['EpochTime'] < epoch_end)][['skewed_latitude', 'skewed_longitude']]
        dispersion_metric = np.mean((filtered['skewed_latitude'].std(), filtered['skewed_longitude'].std()))
        self.__sample = np.append(self.__sample, dispersion_metric)
        return dispersion_metric
    
    def create_distr(self):
        self.mean = np.mean(self.__sample[~np.isnan(self.__sample)])
        self.std =  np.std(self.__sample[~np.isnan(self.__sample)])

    def standarise(self, value):
        return (value - self.mean)/self.std


class WindDataLoader:
    def __init__(self):
        wind_avalon = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_1_Avalon_ACHD.csv')
        wind_lawrence = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_26_Lawrenceville_ACHD.csv')
        wind_liberty = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_28_Liberty_ACHD.csv')
        wind_flagplaza = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_43_and_Feed_11067_Parkway_East_ACHD.csv')
        wind_braddock = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_3_North_Braddock_ACHD.csv')

        wind_avalon['SONICWD_DEG_SIN'] = np.sin(np.deg2rad(wind_avalon['3.feed_1.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))
        wind_lawrence['SONICWD_DEG_SIN'] = np.sin(np.deg2rad(wind_lawrence['3.feed_26.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))
        wind_liberty['SONICWD_DEG_SIN'] = np.sin(np.deg2rad(wind_liberty['3.feed_28.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))
        wind_flagplaza['SONICWD_DEG_SIN'] = np.sin(np.deg2rad(wind_flagplaza['3.feed_11067.SONICWD_DEG..3.feed_43.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))
        wind_braddock['SONICWD_DEG_SIN'] = np.sin(np.deg2rad(wind_braddock['3.feed_3.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))

        wind_avalon['SONICWD_DEG_COS'] = np.cos(np.deg2rad(wind_avalon['3.feed_1.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))
        wind_lawrence['SONICWD_DEG_COS'] = np.cos(np.deg2rad(wind_lawrence['3.feed_26.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))
        wind_liberty['SONICWD_DEG_COS'] = np.cos(np.deg2rad(wind_liberty['3.feed_28.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle)))) 
        wind_flagplaza['SONICWD_DEG_COS'] = np.cos(np.deg2rad(wind_flagplaza['3.feed_11067.SONICWD_DEG..3.feed_43.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))
        wind_braddock['SONICWD_DEG_COS'] = np.cos(np.deg2rad(wind_braddock['3.feed_3.SONICWD_DEG'].apply(lambda angle: self.mathAngle2MeteoAngle(angle))))

        wind_avalon = wind_avalon.rename(columns={'3.feed_1.SONICWS_MPH': 'SONICWS_MPH', '3.feed_1.SIGTHETA_DEG':'SIGTHETA_DEG'})\
            [['EpochTime', 'SONICWS_MPH','SONICWD_DEG_SIN', 'SONICWD_DEG_COS', 'SIGTHETA_DEG']]
        wind_lawrence = wind_lawrence.rename(columns={'3.feed_26.SONICWS_MPH': 'SONICWS_MPH', '3.feed_26.SIGTHETA_DEG':'SIGTHETA_DEG'})\
            [['EpochTime', 'SONICWS_MPH','SONICWD_DEG_SIN', 'SONICWD_DEG_COS', 'SIGTHETA_DEG']]
        wind_liberty = wind_liberty.rename(columns={'3.feed_28.SONICWS_MPH': 'SONICWS_MPH', '3.feed_28.SIGTHETA_DEG':'SIGTHETA_DEG'})\
            [['EpochTime', 'SONICWS_MPH','SONICWD_DEG_SIN', 'SONICWD_DEG_COS', 'SIGTHETA_DEG']]
        wind_flagplaza = wind_flagplaza.rename(columns={'3.feed_11067.SONICWS_MPH..3.feed_43.SONICWS_MPH': 'SONICWS_MPH', '3.feed_11067.SIGTHETA_DEG..3.feed_43.SIGTHETA_DEG':'SIGTHETA_DEG'})\
            [['EpochTime', 'SONICWS_MPH','SONICWD_DEG_SIN', 'SONICWD_DEG_COS', 'SIGTHETA_DEG']]
        wind_braddock = wind_braddock.rename(columns={'3.feed_3.SONICWS_MPH': 'SONICWS_MPH', '3.feed_3.SIGTHETA_DEG':'SIGTHETA_DEG'})\
            [['EpochTime', 'SONICWS_MPH','SONICWD_DEG_SIN', 'SONICWD_DEG_COS', 'SIGTHETA_DEG']]

        self.wind_data = pd.concat([wind_avalon, wind_braddock, wind_flagplaza, wind_lawrence, wind_liberty])
    
    def mathAngle2MeteoAngle(self, angle):
        meteo_angle = 270 - angle
        return meteo_angle if meteo_angle >= 0 else meteo_angle + 365


class WindMetric:
    def __init__(self, reports_data, wind_data):
        self.data = wind_data
        untreated_reports_data = reports_data.groupby('EpochTime', as_index=False)[['smell_value']].agg(lambda x: x.astype(int).count())
        self.data['num_reports'] = self.data['EpochTime'].apply(lambda epoch: \
                                    untreated_reports_data.loc[(untreated_reports_data['EpochTime'] >= epoch) &\
                                                                (untreated_reports_data['EpochTime'] < epoch + 3600)]['smell_value'].sum())
        self.speed_sample = np.array([])
        self.std_direction_sample = np.array([])
        #sigma = np.sqrt(np.mean(self.data['SONICWS_MPH'] **2) / 2)
        #self.__windspeed_distr = rayleigh(scale=sigma)

    def wind_max_speed(self, date_init, date_end):
        """ We'll take the average wind speed during the windiest 5 hours to see if it's been a 'windy' day """
        wind_speeds = self.data.loc[(self.data['EpochTime'] >= date_init) & (self.data['EpochTime'] < date_end)].groupby('EpochTime').agg({"SONICWS_MPH": 'mean'})['SONICWS_MPH']
        mean_wind_speed = np.mean(wind_speeds.to_numpy())
        self.speed_sample = np.append(self.speed_sample, mean_wind_speed)
        return mean_wind_speed
    
    def __weighted_average_direction(self, wind_directions, sin):
        """ Get weighted average wind direction given a period of time """
        column = "SONICWD_DEG_SIN" if sin else "SONICWD_DEG_COS"
        weights = wind_directions['num_reports'].to_numpy()
        weights = np.array([1]*len(weights)) if np.sum(weights) == 0 else weights
        directions = wind_directions[column].to_numpy()
        return np.average(directions, weights=weights)

    def wind_day_direction_cos(self, date_init, date_end):
        """ We'll take the avg wind direction during the period of time there were more reports that day """
        wind_directions = self.data.loc[(self.data['EpochTime'] >= date_init) & (self.data['EpochTime'] < date_end)].groupby('EpochTime').\
            agg({"num_reports": 'mean', 'SONICWD_DEG_COS': 'mean'})
        return self.__weighted_average_direction(wind_directions, False)
    
    def wind_day_direction_sin(self, date_init, date_end):
        """ We'll take the avg wind direction during the period of time there were more reports that day """
        wind_directions = self.data.loc[(self.data['EpochTime'] >= date_init) & (self.data['EpochTime'] < date_end)].groupby('EpochTime').\
            agg({"num_reports": 'mean', 'SONICWD_DEG_SIN': 'mean'})
        return self.__weighted_average_direction(wind_directions, True)
    
    def wind_day_direction_std(self, date_init, date_end):
        """ Average standard deviation of sin and cos std"""
        wind_std_directions = self.data.loc[(self.data['EpochTime'] >= date_init) & (self.data['EpochTime'] < date_end)].groupby('EpochTime').agg({"SIGTHETA_DEG": 'mean'})['SIGTHETA_DEG']
        mean_wind_std_directions = np.mean(wind_std_directions)
        self.std_direction_sample = np.append(self.std_direction_sample, mean_wind_std_directions)
        return mean_wind_std_directions
    
    def create_distr(self):
        self.mean_speed = np.mean(self.speed_sample[~np.isnan(self.speed_sample)])
        self.std_speed = np.std(self.speed_sample[~np.isnan(self.speed_sample)])
        
        self.mean_direction_std = np.mean(self.std_direction_sample[~np.isnan(self.std_direction_sample)])
        self.std_direction_std = np.std(self.std_direction_sample[~np.isnan(self.std_direction_sample)])
    
    def standarise_speed(self, value):
        return (value - self.mean_speed)/self.std_speed
    
    def standarise_direction_std(self, value):
        return (value - self.mean_direction_std)/self.std_direction_std

class SO2Loader:
    def __init__(self):
        so2_liberty = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_28_Liberty_ACHD.csv')[['EpochTime', '3.feed_28.SO2_PPM']]
        so2_avalon  = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_1_Avalon_ACHD.csv')[['EpochTime', '3.feed_1.SO2_PPM']]
        #so2_lawrenceville = pd.read_csv('../../dataset/v2.1/esdr_raw/Feed_27_Lawrenceville_2_ACHD.csv')
        so2_northbraddock = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_3_North_Braddock_ACHD.csv')[['EpochTime', '3.feed_3.SO2_PPM']]

        so2_liberty = so2_liberty.rename(columns={'3.feed_28.SO2_PPM': 'SO2_PPM'})
        so2_avalon  = so2_avalon.rename(columns={'3.feed_1.SO2_PPM': 'SO2_PPM'})
        so2_northbraddock = so2_northbraddock.rename(columns={'3.feed_3.SO2_PPM': 'SO2_PPM'})

        self.data = pd.concat([so2_liberty, so2_avalon, so2_northbraddock])
        #We transform S02 to PPK to make all values not decimal
        self.data['SO2_PPM'] = self.data['SO2_PPM'] * 1000
        #We filter negative values (errors)
        self.data = self.data[self.data['SO2_PPM'] >= 0]

class H2SLoader:
    def __init__(self):
        h2s_liberty = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_28_Liberty_ACHD.csv')[['EpochTime', '3.feed_28.H2S_PPM']]
        h2s_avalon  = pd.read_csv('../Data/dataset/v2.1/esdr_raw/Feed_1_Avalon_ACHD.csv')[['EpochTime', '3.feed_1.H2S_PPM']]
        
        h2s_liberty = h2s_liberty.rename(columns={'3.feed_28.H2S_PPM': 'H2S_PPM'})
        h2s_avalon  = h2s_avalon.rename(columns={'3.feed_1.H2S_PPM': 'H2S_PPM'})
        
        self.data = pd.concat([h2s_liberty, h2s_avalon])
        #We transform S02 to PPK to make all values not decimal
        self.data['SO2_PPM'] = self.data['H2S_PPM'] * 1000 * 1000
        #We filter negative values (errors)
        self.data = self.data[self.data['H2S_PPM'] >= 0]
        self.name = 'H2S_PPM'


class SensorData:
    def __init__(self, so2_data, which_particle) -> None:
        self.data = so2_data
        self.item = which_particle
        self.avg_sample = np.array([])
        self.top_sample = np.array([])

    def get_avg_sensor_data(self, date_init, date_end):
        avg_sensor = self.data.loc[(self.data['EpochTime'] >= date_init) & (self.data['EpochTime'] < date_end)].groupby('EpochTime').agg({self.item: 'mean'})[self.item]
        avg_sensor_mean = avg_sensor.mean()
        self.avg_sample = np.append(self.avg_sample, avg_sensor_mean)
        return avg_sensor_mean
    
    def get_avg_top_sensor_data(self, date_init, date_end, top=2):
        avg_sensor = self.data.loc[(self.data['EpochTime'] >= date_init) & (self.data['EpochTime'] < date_end)].groupby('EpochTime').agg({self.item: 'mean'})[self.item]
        avg_sensor_top = np.mean(np.sort(avg_sensor.to_numpy())[::-1][:top])
        self.top_sample = np.append(self.top_sample, avg_sensor_top)
        return avg_sensor_top

    def create_distr(self):
        self.mean_avg = np.mean(self.avg_sample[~np.isnan(self.avg_sample)])
        self.std_avg = np.std(self.avg_sample[~np.isnan(self.avg_sample)])
        
        self.mean_top = np.mean(self.top_sample[~np.isnan(self.top_sample)])
        self.std_top = np.std(self.top_sample[~np.isnan(self.top_sample)])

    def standarise_avg(self, value):
        return (value - self.mean_avg)/self.std_avg
    
    def standarise_top(self, value):
        return (value - self.mean_top)/self.std_top

         

In [4]:
class SmellPGHTimeSeries():
    def __init__(self, reports_data) -> None:
        self.reports_data = reports_data
        self.data = self.__windowed_time_series(self.reports_data)
        self.data_std = self.__windowed_time_series(self.reports_data)
        #initialise constructors that are in charge of creating the distributions based on the samples
        self.__initialise_constructors()
    
    def __initialise_constructors(self):
        self.__intensity = IntensityMetric(reports_data)
        self.__dispersion = DispersionMetric(reports_data)
        self.__wind = WindMetric(reports_data, WindDataLoader().wind_data)
        self.__so2 = SensorData(SO2Loader().data, "SO2_PPM")
        self.__h2s = SensorData(H2SLoader().data, "H2S_PPM")

    
    def __windowed_time_series(self, data, window_size=5):
        window_data = []
        end = data['EpochTime'].max()
        start = data['EpochTime'].min()
        
        while start < end:
            window_data.append(self.__create_window_row(start))
            start += window_size * 3600
    
        return pd.DataFrame(window_data)
    
    def __create_window_row(self, epoch_start: int, agg_size = 24):
        epoch_finish = epoch_start + agg_size * 3600
        dict = {
            "EpochStart" : epoch_start,
            "EpochEnd"  : epoch_finish,
        }
        return dict

    def build_time_series(self):
        #Create features
        self.__set_reports_intensity()
        self.__set_reports_dispersion()
        self.__set_wind_speed()
        self.__set_wind_direction()
        self.__set_so2_average()
        self.__set_h2s_average()
        #Impute missing data
        self.__impute_missing_data()
        #Set means for standarisation
        self.set_mean_std()
        return self.data
    
    def __set_reports_intensity(self):
        """ -- """
        self.data['report_intensity'] = self.data.apply(lambda range: self.__intensity.get_intensity(range.EpochStart, range.EpochEnd), axis=1)

    def __set_reports_dispersion(self):
        self.data['reports_dispersion'] = self.data.apply(lambda range: self.__dispersion.get_dispersion(range.EpochStart, range.EpochEnd), axis=1)

    def __set_wind_speed(self):
        self.data['wind_speed'] = self.data.apply(lambda range: self.__wind.wind_max_speed(range.EpochStart, range.EpochEnd), axis=1)
    
    def __set_wind_direction(self):
        self.data['wind_dir_sin'] = self.data.apply(lambda range: self.__wind.wind_day_direction_sin(range.EpochStart, range.EpochEnd), axis=1)
        self.data['wind_dir_cos'] = self.data.apply(lambda range: self.__wind.wind_day_direction_cos(range.EpochStart, range.EpochEnd), axis=1)
        self.data['wind_std_dir'] = self.data.apply(lambda range: self.__wind.wind_day_direction_std(range.EpochStart, range.EpochEnd), axis=1)
    
    def __set_so2_average(self):
        self.data['so2_avg'] = self.data.apply(lambda range: self.__so2.get_avg_sensor_data(range.EpochStart, range.EpochEnd), axis=1)
        self.data['so2_top'] = self.data.apply(lambda range: self.__so2.get_avg_top_sensor_data(range.EpochStart, range.EpochEnd), axis=1)

    def __set_h2s_average(self):
        self.data['h2s_avg'] = self.data.apply(lambda range: self.__h2s.get_avg_sensor_data(range.EpochStart, range.EpochEnd), axis=1)
        self.data['h2s_top'] = self.data.apply(lambda range: self.__h2s.get_avg_top_sensor_data(range.EpochStart, range.EpochEnd), axis=1)

    def __impute_missing_data(self):
        features = ['report_intensity', 'reports_dispersion', 'wind_speed', 'wind_dir_cos',\
                     'wind_dir_sin', 'wind_std_dir', 'so2_avg', 'so2_top', 'h2s_avg', 'h2s_top']
        
        for feature in features:
            self.data[feature] = self.data[feature].interpolate()

    def set_mean_std(self):
        self.__intensity.create_distr()
        self.__dispersion.create_distr()
        self.__h2s.create_distr()
        self.__so2.create_distr()
        self.__wind.create_distr()
    
    def z_normalise_time_series(self):
        self.data_std['report_intensity'] = self.data['report_intensity'].apply(lambda value: self.__intensity.standarise(value)) 
        self.data_std['reports_dispersion'] = self.data['reports_dispersion'].apply(lambda value: self.__dispersion.standarise(value))
        self.data_std['wind_speed'] = self.data['wind_speed'].apply(lambda value: self.__wind.standarise_speed(value))
        self.data_std['wind_dir_sin'] = self.data['wind_dir_sin']
        self.data_std['wind_dir_cos'] = self.data['wind_dir_cos']
        self.data_std['wind_std_dir'] = self.data['wind_std_dir'].apply(lambda value: self.__wind.standarise_direction_std(value))
        self.data_std['so2_avg'] = self.data['so2_avg'].apply(lambda value: self.__so2.standarise_avg(value))
        self.data_std['so2_top'] = self.data['so2_top'].apply(lambda value: self.__so2.standarise_top(value))
        self.data_std['h2s_avg'] = self.data['h2s_avg'].apply(lambda value: self.__h2s.standarise_avg(value))
        self.data_std['h2s_top'] = self.data['h2s_top'].apply(lambda value: self.__h2s.standarise_top(value))
        return self.data_std

thesis_data = SmellPGHTimeSeries(reports_data)
ts_data = thesis_data.build_time_series()
ts_zdata = thesis_data.z_normalise_time_series()


In [5]:
ts_zdata

Unnamed: 0,EpochStart,EpochEnd,report_intensity,reports_dispersion,wind_speed,wind_dir_sin,wind_dir_cos,wind_std_dir,so2_avg,so2_top,h2s_avg,h2s_top
0,1477935134,1478021534,-0.157437,-1.578301,-1.252416,0.280321,-0.222816,1.024987,-0.361539,0.000462,-0.559566,-0.410486
1,1477953134,1478039534,-0.151564,-1.645146,-1.032301,0.363363,-0.156433,0.510890,-0.122213,0.087651,-0.447187,-0.379545
2,1477971134,1478057534,-0.086965,-2.020338,-0.951771,0.351290,-0.164777,0.305617,0.766715,1.003133,0.088776,0.208332
3,1477989134,1478075534,-0.116328,-2.001268,-0.884215,0.350520,-0.171512,0.278099,1.832288,1.133916,0.858141,0.486800
4,1478007134,1478093534,1.316603,-2.296753,-0.722708,0.562901,-0.265060,-0.260124,2.544570,1.199308,1.299013,0.486800
...,...,...,...,...,...,...,...,...,...,...,...,...
10706,1670643134,1670729534,-0.357108,0.215788,-0.952442,0.284016,-0.745978,0.359609,-0.783210,-0.773339,-0.680589,-0.719895
10707,1670661134,1670747534,-0.357108,0.093599,-0.986443,0.339001,-0.750742,0.309269,-0.817400,-0.871426,-0.697879,-0.781777
10708,1670679134,1670765534,-0.409962,0.108148,-0.938349,0.427353,-0.751731,0.048699,-0.817400,-0.871426,-0.697879,-0.781777
10709,1670697134,1670783534,-0.509797,-0.236565,-0.958929,0.460793,-0.768472,-0.420013,-0.817400,-0.871426,-0.697879,-0.781777


In [6]:
ts_zdata.to_csv('../Data/Clean Data/ts_zdata.csv', index=False)