# Vehicle Speed Aggregation: Moving Average Strategy

* save directory: `../datasets/per-vehicle-moving-average/<aggregation type>/window-<some window choice>-<datestring>`

In [1]:
import shap
import pandas as pd
import numpy as np

import time

import seaborn as sns

import matplotlib.pyplot as plt

import lightgbm as lgb
from sklearn.preprocessing import LabelEncoder

from glob import glob
import os
import optuna
import joblib
from tqdm import tqdm

import warnings
warnings.filterwarnings('ignore')

import multiprocessing as mp
from datetime import datetime

2022-05-28


In [2]:
import psutil
    
ram_gb = psutil.virtual_memory().total / 2**30 # total physical memory in bytes
print(f"RAM: {ram_gb:.2f} GB")

RAM: 251.79 GB


## Read from source

In [3]:
# get paths
root = '../datasets/processed'
source_date = '2022-05-24'
glob_path = os.path.join(root, source_date, '*.csv')
filepaths = glob(glob_path)

# check
filepaths[0]

'../datasets/processed/2022-05-24/123200872603.csv'

In [5]:
# read
df = pd.concat([pd.read_csv(path) for path in filepaths])

In [6]:
def get_periods(row, ref_index, thresh):
    """Return period counts for thresh"""
    timedeltas = (row.name - ref_index).total_seconds()
    # >= 0 includes self
    periods = np.sum((timedeltas <= thresh) & (timedeltas >= 0))
    return periods

def get_agg_speed(row, group, thresh, agg='mean'):
    """Return a aggregated speed value according to thresh and agg."""
    timedeltas = (row.name - group.index).total_seconds()
    mask = (timedeltas <= thresh) & (timedeltas >= 0)
    # >= 0 includes self
    if agg=='mean':
        return group[mask]['instant_speed'].mean()
    elif agg=='median':
        return group[mask]['instant_speed'].median()    
    else:
        raise "Enter valid agg."

## Parallelize

In [7]:
def mp_get_periods(chunk, ref_index, thresh):
    chunk['num_periods'] = chunk.apply(
        lambda row: get_periods(row, ref_index, thresh=thresh), axis=1)
    return chunk

def mp_agg_speed(chunk, full_df, thresh, agg):
    chunk['agg_speed'] = chunk.apply(
        lambda row: get_agg_speed(row, full_df, thresh=thresh, agg=agg), axis=1)
    return chunk

# parallelize agg function
def parallelize_get_periods(df, func, thresh):
    """Parallelize mp_elevation and mp_building_counts functions"""
    ref_index = df.index
    cpus = mp.cpu_count()
    df_chunks = np.array_split(df, cpus)
    pool = mp.Pool(processes=cpus)
    chunk_processes = [pool.apply_async(func, args=(chunk, ref_index, thresh)) for chunk in df_chunks]
    df_results = []
    for chunk in chunk_processes:
        res = chunk.get()
        df_results.append(res)
    df_out = pd.concat(df_results)
    return df_out

def parallelize_agg_speed(df, func, thresh, agg):
    cpus = mp.cpu_count()
    df_chunks = np.array_split(df, cpus)
    pool = mp.Pool(processes=cpus)
    chunk_processes = [pool.apply_async(func, args=(chunk, df, thresh, agg)) for chunk in df_chunks]
    df_results = []
    for chunk in chunk_processes:
        res = chunk.get()
        df_results.append(res)
    df_out = pd.concat(df_results)
    return df_out


In [None]:
## use this for progress_apply method:
# tqdm.pandas()
date = '2022-05-27' # alternatively use now date for new runs

# date = str(datetime.date(datetime.now())) # now date
# print(date)


lookback_window = 60 # seconds, can be changed
agg = 'mean'

# recompute speed through "moving average" method, get median/mean speed in window
# saves a new csv with columns for number of periods (num_periods) and agg vehicle speed (agg_speed)

for index, path in enumerate(filepaths):
    name = path.split('/')[-1].split('.')[0]
    group = pd.read_csv(path)
    
    # prep save paths
    save_dir = f'../datasets/per-vehicle-moving-average/{agg}-window-{lookback_window}-{date}'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    save_path = os.path.join(save_dir, f"{name}.csv")
    if not os.path.exists(save_path):
        print("Running for id: ", save_path)
    
        # get datetime index
        group['datetime'] = pd.to_datetime(group['datetime'])
        group = group.sort_values(by='datetime')
        group = group.set_index('datetime')

        group = parallelize_get_periods(
            df=group, 
            func=mp_get_periods, 
            thresh=lookback_window)

        group = parallelize_agg_speed(
            df=group, 
            func=mp_agg_speed, 
            thresh=lookback_window, 
            agg=agg) 

        group = group.reset_index()
#         display(group.info())
    
        # save
        group.to_csv(save_path, index=False)
    print(f"Done: {index + 1}/200")

Done: 1/200
Done: 2/200
Done: 3/200
Done: 4/200
Done: 5/200
Done: 6/200
Done: 7/200
Done: 8/200
Done: 9/200
Done: 10/200
Done: 11/200
Done: 12/200
Done: 13/200
Done: 14/200
Done: 15/200
Done: 16/200
Done: 17/200
Done: 18/200
Done: 19/200
Done: 20/200
Done: 21/200
Running for id:  ../datasets/per-vehicle-moving-average/mean-window-60-2022-05-27/123200872648.csv
Done: 22/200
Running for id:  ../datasets/per-vehicle-moving-average/mean-window-60-2022-05-27/123200872650.csv
Done: 23/200
Running for id:  ../datasets/per-vehicle-moving-average/mean-window-60-2022-05-27/123200872651.csv
Done: 24/200
Running for id:  ../datasets/per-vehicle-moving-average/mean-window-60-2022-05-27/123200872654.csv
Done: 25/200
Running for id:  ../datasets/per-vehicle-moving-average/mean-window-60-2022-05-27/123200872655.csv
Done: 26/200
Running for id:  ../datasets/per-vehicle-moving-average/mean-window-60-2022-05-27/123200872656.csv
Done: 27/200
Running for id:  ../datasets/per-vehicle-moving-average/mean-win