# Preprocessing

In [None]:
# import modules
import pandas as pd
import numpy as np
import matplotlib.colors as colors
import matplotlib.pyplot as plt
from pymongo import MongoClient
from scipy import stats
from sklearn.metrics import mean_absolute_error

# remove warnings
import warnings
warnings.filterwarnings('ignore')

# set default plot style
plt.style.use('seaborn-dark')

plt.rcParams['figure.figsize'] = 10,8
plt.rcParams['figure.frameon'] = False
plt.rcParams['legend.frameon'] = False

# get data
pm_df = pd.read_json('./data/pmsensors.json')
tf_df = pd.read_json('./data/trafficflows.json')
ti_df = pd.read_json('./data/trafficincidents.json')

print('pm_df length:', len(pm_df))
print('tf_df length:', len(tf_df))
print('ti_df length:', len(ti_df))

In [None]:
# drop nulls
pm_df = pm_df.dropna()
ti_df = ti_df.dropna()
tf_df = tf_df.dropna()

# drop MongoDB id and version
pm_df = pm_df.drop(['_id', '__v'], axis=1)
tf_df = tf_df.drop(['_id', '__v'], axis=1)
ti_df = ti_df.drop(['_id', '__v'], axis=1)

print('pm_df length:', len(pm_df))
print('tf_df length:', len(tf_df))
print('ti_df length:', len(ti_df))

In [None]:
# plot timestamps of all datapoints
plt.figure(figsize=(20,8))

plt.eventplot(pm_df['timestamp'], orientation='horizontal', lineoffsets=-0.25, linelengths=0.2, colors='r')
plt.eventplot(tf_df['timestamp'], orientation='horizontal', lineoffsets=0, linelengths=0.2, colors='g')
plt.eventplot(ti_df['timestamp'], orientation='horizontal', lineoffsets=0.25, linelengths=0.2, colors='b')

plt.legend(['pm sensor readings','traffic flow readings','traffic incident readings'])
plt.show()

In [None]:
# drop data from early sessions (connection unreliable)
pm_df = pm_df[pm_df['timestamp'] >= '2021-12-01']

In [None]:
# add session numbers to particulate data
pm_df['delta'] = pm_df['timestamp'].diff()
pm_df = pm_df.dropna() # drop first row after calling diff 

# remove deltas < 1 s (caused by unreliable data surges after a connection drop out)
pm_df = pm_df.drop(pm_df[pm_df['delta'] < pd.Timedelta(0.5, unit='s')].index)

# find large deltas > 100 s (implies start of new journey)
max_dropout = 100 # max acceptable dropout before starting new session
pm_df['large_delta'] = pm_df['delta'].apply(lambda x: x > pd.Timedelta(max_dropout, unit='s'))
pm_df['large_delta'] = pm_df['large_delta'].apply(lambda x: 1 if x else 0)

# assign session
pm_df['sessionNo'] = pm_df['large_delta'].cumsum()

In [None]:
def get_session_interval(df, session_no):
    # finds the start and end times for a given session
    session_data = df[df['sessionNo'] == session_no]
    start = session_data['timestamp'].min()
    end = session_data['timestamp'].max()
    return {
        'start': start,
        'end': end
    }

last_session = pm_df['sessionNo'].max()

# drop sessions shorter than 3 mins
for i in range(last_session + 1):
    session_interval = get_session_interval(pm_df, i)
    duration = session_interval['end'] - session_interval['start']

    if duration < pd.Timedelta(180, unit='s'):
        pm_df = pm_df.drop(pm_df[pm_df['sessionNo'] == i].index)

# reassign session numbers (maintains consistent increments)
pm_df = pm_df.drop(['sessionNo'], axis=1)
pm_df['sessionNo'] = pm_df['large_delta'].cumsum()

In [None]:
pm_df_std = pm_df[['pm1_0', 'pm2_5', 'pm10_0', 'delta', 'large_delta']]
pm_df_std = pm_df_std[pm_df_std['large_delta'] != 1]
pm_df_std = pm_df_std.drop(['large_delta'], axis=1)
pm_df_std['delta'] = pm_df_std['delta'].apply(lambda x: x.total_seconds())

z_scores = stats.zscore(pm_df_std)
abs_z_scores = np.abs(z_scores)
filtered_entries = (abs_z_scores < 3).all(axis=1)

pm_df_std = pm_df_std[filtered_entries]
pm_df_std.std()

In [None]:
pm_df_std.mean()

In [None]:
pm_df_std['delta'].hist(bins=30)

In [None]:
pm_df = pm_df.drop(['delta', 'large_delta'] ,axis=1)

In [None]:
# assign sessions to tf_df and ti_df (and remove any entries outside ranges)
session_intervals = []

last_session = pm_df['sessionNo'].max()

for i in range(last_session + 1):
    session_intervals.append(get_session_interval(pm_df, i))

def assign_session(x, intervals):
    # assign session numbers to ti_df and tf_df frames
    # set any timestamps outside session intervals to NaN (to then drop from df later)
    # reasons for timestamps outside session intervals include leaving OwnTracks open...
    # ... after completing a journey

    for (i, interval) in enumerate(intervals):
        if x >= interval['start'] and x <= interval['end']:
            # point is within a valid interval
            return i
    # point outside valid intervals
    return np.nan

tf_df['sessionNo'] = tf_df['timestamp'].apply(assign_session, args=([session_intervals]))
ti_df['sessionNo'] = ti_df['timestamp'].apply(assign_session, args=([session_intervals]))

tf_df = tf_df.dropna()
ti_df = ti_df.dropna()

# cast to ints
tf_df['sessionNo'] = tf_df['sessionNo'].astype(int)
ti_df['sessionNo'] = ti_df['sessionNo'].astype(int)

print('pm_df length:', len(pm_df))
print('tf_df length:', len(tf_df))
print('ti_df length:', len(ti_df))

In [None]:
# repeat to drop any pm_df values that fall outside collected traffic data
session_intervals = []

last_session = tf_df['sessionNo'].max()

for i in range(last_session + 1):
    session_intervals.append(get_session_interval(tf_df, i))

pm_df['inTFSession'] = pm_df['timestamp'].apply(assign_session, args=([session_intervals]))

pm_df = pm_df.dropna()
pm_df = pm_df.drop(['inTFSession'], axis=1)

In [None]:
# reassess intervals
plt.figure(figsize=(20,8))

plt.eventplot(pm_df['timestamp'], orientation='horizontal', lineoffsets=-0.25, linelengths=0.2, colors='r')
plt.eventplot(tf_df['timestamp'], orientation='horizontal', lineoffsets=0, linelengths=0.2, colors='g')
plt.eventplot(ti_df['timestamp'], orientation='horizontal', lineoffsets=0.25, linelengths=0.2, colors='b')

plt.legend(['pmsensors','trafficflows','trafficincidents'])
plt.show()

In [None]:
pm_df.head()

# Analysis

In [None]:
# find average of particulate matter values 
pm_readings = pm_df[['pm1_0', 'pm2_5', 'pm10_0']]
pm_avg = pm_readings.sum(axis=1)/3
pm_df['pmAvg'] = pm_avg

In [None]:
# add a delay factor to the traffic flow frame (equivalent to factor of reduction in speed vs free flow)
tf_df['delayFactor'] = tf_df.apply(lambda x: x['freeFlowSpeed']/x['currentSpeed'], axis=1)
print('max delay factor', tf_df['delayFactor'].max())
print('min delay factor', tf_df['delayFactor'].min())

tf_df.head()

In [None]:
# ensure values are chronological
pm_df.sort_values(by=['timestamp'], inplace=True)
tf_df.sort_values(by=['timestamp'], inplace=True)
ti_df.sort_values(by=['timestamp'], inplace=True)

# index rows by time
time_index = pd.DatetimeIndex(pm_df['timestamp'])
pm_df = pm_df.set_index(time_index)
pm_df = pm_df.drop(['timestamp'], axis=1)

time_index = pd.DatetimeIndex(tf_df['timestamp'])
tf_df = tf_df.set_index(time_index)
tf_df = tf_df.drop(['timestamp'], axis=1)

time_index = pd.DatetimeIndex(ti_df['timestamp'])
ti_df = ti_df.set_index(time_index)
ti_df = ti_df.drop(['timestamp'], axis=1)

In [None]:
# select a session for analysis
session_no = 10

pm_df_s = pm_df[pm_df['sessionNo'] == session_no]
tf_df_s = tf_df[tf_df['sessionNo'] == session_no]
ti_df_s = ti_df[ti_df['sessionNo'] == session_no]

In [None]:
# plotting traffic delay factor versus location as a scatter plot
plot_session = tf_df

max_delay_factor = tf_df['delayFactor'].max()

plt.figure(figsize=(10,10))
plt.scatter(
    plot_session['lon'],
    plot_session['lat'],
    c=plot_session['delayFactor'],
    cmap=plt.get_cmap('jet'),
    norm=colors.LogNorm(
        vmin=1,
        vmax=max_delay_factor*0.6,
        clip=True),
    alpha=0.3
    )

plt.colorbar()
plt.show()

In [None]:
# smooth particulate session data and remove anomalies
def mean_absolute_percentage_error(y_true, y_pred): # delete if not used
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

# moving_average and find_anomalies modified from smarthome_data_processing
def moving_average(series, filter_window=6, ax=None):
    # returns the filtered series, and an axes to plot if required
    rolling_mean = series.rolling(window=filter_window).mean()
    if ax:
        ax.plot(rolling_mean, label='rolling mean ({})'.format(series.name))
        ax.plot(series[filter_window:], label='original data ({})'.format(series.name))
        ax.legend(loc='upper left')
    return rolling_mean


def find_anomalies(series, filter_window=6, ax=None, scale=2.576): # 2.576 = 99% confidence interval 
    rolling_mean = moving_average(series, filter_window, ax)
    mae = mean_absolute_error(series[filter_window:], rolling_mean[filter_window:])

    deviation = np.std(series[filter_window:] - rolling_mean[filter_window:])
    lower_bond = rolling_mean - (mae + scale * deviation)
    upper_bond = rolling_mean + (mae + scale * deviation)

    anomalies = pd.Series(index=series.index, name=series.name)
    anomalies[series<lower_bond] = series[series<lower_bond]
    anomalies[series>upper_bond] = series[series>upper_bond]
    
    if ax:
        ax.plot(upper_bond, "r--", label="upper bound/lower bound")
        ax.plot(lower_bond, "r--")
        ax.plot(anomalies, "ro", markersize=10)

    return anomalies # time-indexed series, anomalous results == anomalous value, otherwise null

filter_cols = ['pm1_0', 'pm2_5', 'pm10_0', 'pmAvg']

fig, axs = plt.subplots(4, 1, figsize=(20,20))

for i, col in enumerate(filter_cols):
    moving_average(pm_df_s[col], ax=axs[i])


In [None]:
fig, axs = plt.subplots(4, 1, figsize=(20,20))

for i, col in enumerate(filter_cols):
    find_anomalies(pm_df_s[col], ax=axs[i])

In [None]:
def find_all_pm_anomalies():
    # label anomalies in entire pm dataset
    pm_df_c = pm_df.copy()
    pm_df_c.drop(pm_df_c.index, inplace=True) # empty dataframe

    for session in range(last_session + 1):
        # for each session
        session_data = pm_df[pm_df['sessionNo'] == session]

        for col in filter_cols:
            # for each col ['pm1_0', 'pm2_5', 'pm10_0', 'pmAvg']
            col_name = col + 'Anom'
            session_data[col_name] = find_anomalies(session_data[col])
        
        pm_df_c = pd.concat([pm_df_c, session_data])

    return pm_df_c

pm_df = find_all_pm_anomalies()

In [None]:
# set anomalies to nan
for col in filter_cols:
    anom_col = col + 'Anom'
    pm_df[anom_col] = pm_df[anom_col].notna()


def set_to_null(value, anom_flag):
    return np.nan if anom_flag else value

for col in filter_cols:
    anom_col = col + 'Anom'
    pm_df[col] = pm_df.apply(lambda row: set_to_null(row[col], row[anom_col]), axis = 1)

pm_df.head()

In [None]:
# farewell helper columns, we thank you
pm_df = pm_df.drop(['pm1_0Anom', 'pm2_5Anom', 'pm10_0Anom', 'pmAvgAnom'], axis=1)

In [None]:
# interpolate anomalous values
pm_df = pm_df.interpolate(method='time')

In [None]:
# confirm result with session
pm_df_s = pm_df[pm_df['sessionNo'] == session_no]

# fig, axs = plt.subplots(1, 1, figsize=(20,8))
fig = plt.figure(figsize=(20,8))
ax = plt.axes()

moving_average(pm_df_s['pm1_0'], ax=ax)

In [None]:
def smooth_pm_data():
    # smooth all data sessions
    pm_df_c = pm_df.copy()
    pm_df_c.drop(pm_df_c.index, inplace=True) # empty dataframe

    for session in range(last_session + 1):
        # for each session
        session_data = pm_df[pm_df['sessionNo'] == session]

        for col in filter_cols:
            # for each col ['pm1_0', 'pm2_5', 'pm10_0', 'pmAvg']
            col_name = col + 'Smoothed'
            session_data[col_name] = moving_average(session_data[col])
        
        pm_df_c = pd.concat([pm_df_c, session_data])

    # drop nulls (first 6 entries of each window)
    pm_df_c = pm_df_c.dropna()
    return pm_df_c

pm_df = smooth_pm_data()

In [None]:
# confirm result with session
pm_df_s = pm_df[pm_df['sessionNo'] == session_no]

plt.figure(figsize=(20,8))
plt.plot(pm_df_s['pm1_0Smoothed'])

# Correlations

In [None]:
# to assess correlations, need to first combine the datasets
tf_df = tf_df.reindex(pm_df.index, method='ffill')
tf_df.head()

In [None]:
df = pm_df.join(tf_df, rsuffix='TF')

In [None]:
df = df.drop(['sessionNoTF'], axis=1)

In [None]:
attributes = [
    'pm1_0Smoothed',
    'pm2_5Smoothed',
    'pm10_0Smoothed',
    'pmAvgSmoothed',
    'lat',
    'lon',
    'alt',
    'currentSpeed',
    'freeFlowSpeed',
    'delayFactor',
    ]

pd.plotting.scatter_matrix(df[attributes], figsize=(20,20))
plt.show()

In [None]:
# connect to MongoDB
client =  MongoClient('<<DATABASE URL>>')
db = client['SIOTData']
collection = db['processeddatas']
df.reset_index(inplace=True)
data_dict = df.to_dict("records")


# insert collection
collection.insert_many(data_dict)