In [1]:
%conda install -c anaconda pandasql 

Collecting package metadata (current_repodata.json): done
Solving environment: / 
The environment is inconsistent, please check the package plan carefully
The following packages are causing the inconsistency:

  - conda-forge/noarch::aioitertools==0.7.1=pyhd8ed1ab_0
  - conda-forge/linux-64::importlib-metadata==3.7.0=py36h5fab9bb_0
  - conda-forge/noarch::typing-extensions==3.7.4.3=0
  - conda-forge/noarch::black==20.8b1=py_1
  - conda-forge/noarch::helpdev==0.7.1=pyhd8ed1ab_0
  - conda-forge/noarch::imageio==2.9.0=py_0
  - conda-forge/noarch::importlib_metadata==3.7.0=hd8ed1ab_0
  - conda-forge/linux-64::yarl==1.6.3=py36h8f6f2f9_1
  - conda-forge/noarch::flake8==3.8.4=py_0
  - conda-forge/noarch::jsonschema==3.2.0=py_2
  - conda-forge/linux-64::matplotlib-base==3.3.4=py36hd391965_0
  - conda-forge/linux-64::path==15.1.2=py36h5fab9bb_0
  - conda-forge/linux-64::pluggy==0.13.1=py36h5fab9bb_4
  - conda-forge/noarch::qdarkstyle==2.8.1=pyhd8ed1ab_2
  - conda-forge/linux-64::anyio==2.1.0=py

In [2]:
%conda install -c anaconda openpyxl

Collecting package metadata (current_repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Collecting package metadata (repodata.json): done
Solving environment: done


  current version: 4.8.4
  latest version: 4.11.0

Please update conda by running

    $ conda update -n base -c defaults conda



## Package Plan ##

  environment location: /home/ec2-user/anaconda3/envs/python3

  added / updated specs:
    - openpyxl


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    astroid-2.5                |   py36h5fab9bb_1         297 KB  conda-forge
    babel-2.9.0                |     pyhd3deb0d_0         6.2 MB  conda-forge
    bleach-3.3.0               |     pyh44b312d_0         111 KB  conda-forge
    bokeh-2.2.3                |   py36h5fab9bb_0         7.0 MB  conda-forge
    colorama-0.4.4             |     pyh9f0ad1d_0          18 KB  c

In [3]:
import pandas as pd
import numpy as np
import datetime
from datetime import timedelta
import pandasql as psql
import glob
import time

# Functions to Read in Raw Data and Impute with Linear Interpolation

In [4]:
def full_time(df, interval="15min", min_date=None, max_date=None):
    """Function to generate full df of dates to determine which ones have missing data"""
    
    # Specify the start and end timestamps for the full date range
    if not min_date:
        start_date = df["timestamp"].min()
    else:
        start_date = min_date
    
    if not max_date:
        end_date = df["timestamp"].max()
    else:
        end_date = max_date
    
    # Create a full date range using specified freq, typically either 15min or 5 min depending on the data 
    date_list = pd.date_range(start=start_date, end=end_date, freq=interval)
    
    # Put this date range into a df
    date_df = pd.DataFrame({"timestamp": date_list})
    
    # Create columns for date, day of week, and day of year in addition to the timestamp column
    date_df["date"] = pd.to_datetime(date_df["timestamp"].astype("string").str[:10])
    date_df["day_of_week"] = date_df["date"].dt.dayofweek
    date_df["day_of_year"] = date_df["date"].dt.dayofyear
    
    return date_df

In [5]:
def read_portland(fname, highway_number, min_date=None, max_date=None):
    """Function to read in data from a full highway in Portland/Vancouver and return a df with only
    the detector with best data availability and only necessary columns"""
    
    # Read in the csv with pandas
    df = pd.read_csv(fname)
    
    # Add date field
    df["date"] = pd.to_datetime(df["starttime"].astype("str").str[:10])
    df["mnth"] = df["date"].dt.to_period("M")

    # Group by detector and count, sort, 
    # and grab the last detector in the object (this will have the highest count)
    best_det = df.groupby(["detector_id"]).count().reset_index()[["detector_id", "volume"]]\
    .sort_values(by="volume").iloc[-1,0]

    # Subset the dataframe to only this detector and return the subset
    df_sub = df.query("detector_id == @best_det").copy()

    # Set the site name in the df
    df_sub["site_name"] = f"{highway_number}/" + df_sub["detector_id"].astype("str") 

    # Get the time period ending in the same format as highways England    
    df_sub["time_period_start"] = df_sub["starttime"].astype("str").str[11:19]
    df_sub["time_period_ending"] = (pd.to_datetime(df_sub["time_period_start"]) + datetime.timedelta(minutes=14)).dt.time.astype("str")
    df_sub["timestamp"] = pd.to_datetime(df_sub["date"].astype("str") + " " + df_sub["time_period_ending"])

    # Rename columns
    df_sub["total_volume"] = df_sub["volume"]
    df_sub["avg_mph"] = df_sub["speed"]

    # Grab only needed columns
    df_sub = df_sub[["site_name", "timestamp", "avg_mph", "total_volume"]]

    dates = full_time(df_sub, interval="15min", min_date=min_date, max_date=max_date)

    # Merge full date list with actual data
    df = dates.merge(df_sub, how="left", on="timestamp")

    site = df["site_name"].unique()[0]

    df.fillna({"site_name": site}, inplace=True)

    # Use pandasql to impute the 'interval_of_day' field 
    interval_of_day_impute = """
    SELECT site_name,
           day_of_week,
           date(date) AS date,
           day_of_year,
           timestamp,
           DENSE_RANK() OVER (PARTITION BY DATE ORDER BY timestamp) - 1 AS interval_of_day,
           avg_mph,
           total_volume
    FROM df
    """
    df = psql.sqldf(interval_of_day_impute, locals())

    # Create field with T/F if speed data is missing
    df["missing_speed"] = np.where(df["avg_mph"].isnull(), True, False)

    # Create field with T/F if volume data is missing
    df["missing_volume"] = np.where(df["total_volume"].isnull(), True, False)

    # Use linear interoplation to fill in nulls
    df["avg_mph"] = df["avg_mph"].interpolate()
    df["total_volume"] = df["total_volume"].interpolate()

    df = df.drop_duplicates()

    return df, best_det

In [6]:
def read_highways_england(fname, min_date=None, max_date=None):
    """Function to read in csv file of highway sensor data"""
    
    # Read file into Pandas df
    df = pd.read_csv(fname)
    
    # Grab relevant columns from df
    df = df[["Site Name", "Report Date", "Time Period Ending", "Time Interval", "Avg mph", "Total Volume"]]
    
    # Re-format date field and cast to string
    df["Date"] = pd.to_datetime(df["Report Date"], format='%d/%m/%Y 00:00:00').astype("string") 
    
    # Grab the timestamp of the time-period in the hour
    df["Time Period Ending"] = df["Time Period Ending"].astype("string")
    
    # Create a true timestamp which includes both date and hour and minutes
    df["Timestamp"] = pd.to_datetime(df["Date"] + " " + df["Time Period Ending"])
    
    # Subset columns and rename to include _ to make columns easier to work with
    df = df[["Site Name", "Timestamp", "Time Interval", "Avg mph", "Total Volume"]]\
    .rename(columns={"Site Name": "site_name",
                     "Timestamp":"timestamp",
                     "Time Interval": "interval_of_day",
                     "Avg mph": "avg_mph",
                     "Total Volume": "total_volume"})
    
    # Compute dates for left join 
    dates = full_time(df, interval="15min", min_date=min_date, max_date=max_date)
    
    # Merge full date list with actual data
    df = dates.merge(df, how="left", on="timestamp")
    
    site = df["site_name"].unique()[0]
    
    df.fillna({"site_name": site}, inplace=True)
    
    # Use pandasql to impute the 'interval_of_day' field 
    interval_of_day_impute = """
    SELECT site_name,
           day_of_week,
           date(date) AS date,
           day_of_year,
           timestamp,
           DENSE_RANK() OVER (PARTITION BY DATE ORDER BY timestamp) - 1 AS interval_of_day,
           avg_mph,
           total_volume
    FROM df
    """
    df = psql.sqldf(interval_of_day_impute, locals())
    
    # Create field with T/F if speed data is missing
    df["missing_speed"] = np.where(df["avg_mph"].isnull(), True, False)
    
    # Create field with T/F if volume data is missing
    df["missing_volume"] = np.where(df["total_volume"].isnull(), True, False)

    # Set DateTime Index
#     df["timestamp"] = pd.to_datetime(df["timestamp"])
#     df = df.set_index("timestamp")
    
    # Use linear interoplation to fill in nulls
    df["avg_mph"] = df["avg_mph"].interpolate()
    df["total_volume"] = df["total_volume"].interpolate()
    
#     df = df.reset_index()
    
    return df

In [7]:
def read_utah(folder_path, highway_number, detector_id, min_date=None, max_date=None):
    """Function to read in data from a UTAH PEMS sensor and aggregate from 5min to 15min to be the same as 
    other data sources in our set (Highways England, Portland-Vancouver Regional)"""
    
    # Grab all excel files in the provided path, there are typically 13-14 files per sensor and year
    flist = glob.glob(f"{folder_path}/*.xlsx")
    df = pd.DataFrame()
    for fname in flist:
        df_sub = pd.read_excel(fname, engine="openpyxl")
        df = df.append(df_sub)
    df = df.drop_duplicates()
    
    df["date"] = df["5 Minutes"].astype("str").str[0:11]
    df["time_period_start"] = df["5 Minutes"].astype("str").str[11:19]
    df["time_period_ending"] = (pd.to_datetime(df["time_period_start"]) + datetime.timedelta(minutes=4)).dt.time.astype("str")
    df["timestamp"] = pd.to_datetime(df["date"] + " " + df["time_period_ending"]).round("min")
    df["total_volume"] = df["Flow (Veh/5 Minutes)"]
    df["avg_mph"] = df["Speed (mph)"]
    df["site_name"] = f"{highway_number}/{detector_id}"
    
    df = df[["site_name", "timestamp", "avg_mph", "total_volume", "time_period_start"]].copy()
    
    df = df.query("timestamp >= '2019-01-01 00:00:00'").query("timestamp < '2020-01-01 00:00:00'").copy()
    
    date_df = full_time(df, interval="5min", min_date=min_date, max_date=max_date)
    
    full_df = date_df.merge(df, how="left", on="timestamp")
    
    site_name = full_df.site_name.unique()[0]
    full_df.fillna({"site_name": site_name}, inplace=True)
    
    interval_of_day = """
    SELECT  site_name,
            day_of_week,
            date(date) AS date,
            day_of_year,
            timestamp,
            DENSE_RANK() OVER (PARTITION BY DATE ORDER BY timestamp) - 1 AS interval_of_day,
            avg_mph,
            total_volume 
    FROM full_df
    """
    full_df = psql.sqldf(interval_of_day, locals())
    
    # Create field with T/F if speed data is missing
    full_df["missing_speed"] = np.where(full_df["avg_mph"].isnull(), True, False)
    
    # Create field with T/F if volume data is missing
    full_df["missing_volume"] = np.where(full_df["total_volume"].isnull(), True, False)
    
    # Use linear interoplation to fill in nulls
    df["avg_mph"] = df["avg_mph"].interpolate()
    df["total_volume"] = df["total_volume"].interpolate()
        
    # Aggregate up to 15 minute intervals - this section is moved to R because of linear interp not working well
#     full_df["min_15_int"] = full_df["interval_of_day"] // 3
    
#     full_df = full_df.groupby(["site_name", "day_of_week", "date", "day_of_year","min_15_int"]).\
#     agg({"timestamp": "max", "total_volume": "sum", "avg_mph": "mean", 
#          "missing_speed": "max", "missing_volume": "max"}).reset_index()
    
#     full_df.rename(columns={"min_15_int": "interval_of_day"}, inplace=True)
    
    full_df = full_df[["site_name",
                       "day_of_week",
                       "date",
                       "day_of_year",
                       "timestamp",
                       "interval_of_day",
                       "avg_mph",
                       "total_volume",
                       "missing_speed",
                       "missing_volume"]].copy()
    
    return full_df

# Read in Raw Data, Impute, and Write to New Directory

In [14]:
min_date_5 = "2019-01-01 00:04:00"
min_date_15 = "2019-01-01 00:14:00"
max_date = "2019-12-31 23:59:00"

In [15]:
c = 0

In [16]:
for fname in glob.glob("Data/Raw/Highways_England/*.csv"):
    print("Reading {}".format(fname))
    
    fname_new = "Data/Intermediate/Highways_England/{}_Intermediate.csv".format(fname.split("/")[-1].split(".")[0])
    
    df = read_highways_england(fname, min_date_15, max_date)
    
    df.to_csv(fname_new, index=False)
    
    c += 1

Reading Data/Raw/Highways_England/M18-7578A_Northbound_2019.csv
Reading Data/Raw/Highways_England/M25-4490B_Counterclockwise_2019.csv
Reading Data/Raw/Highways_England/M6-5441A_Northbound_2019.csv
Reading Data/Raw/Highways_England/M11-6747A_Northbound_2019.csv
Reading Data/Raw/Highways_England/A14-1107A_Eastbound_2019.csv
Reading Data/Raw/Highways_England/A64-9251-1_Westbound_2019.csv
Reading Data/Raw/Highways_England/A11-6312-2_Northbound_2019.csv
Reading Data/Raw/Highways_England/M5-8291A_Southbound_2019.csv
Reading Data/Raw/Highways_England/M20-6552A2_Eastbound_2019.csv
Reading Data/Raw/Highways_England/M60-9327B_Counterclockwise_2019.csv
Reading Data/Raw/Highways_England/M60-9374A_Clockwise_2019.csv
Reading Data/Raw/Highways_England/M25-4565A_Clockwise_2019.csv
Reading Data/Raw/Highways_England/A5-7572-1-Northbound_2019.csv
Reading Data/Raw/Highways_England/M1-2633A_Northbound_2019.csv
Reading Data/Raw/Highways_England/A1M-9842B_Southbound_2019.csv
Reading Data/Raw/Highways_England

In [17]:
portland_highway_numbers = pd.read_csv("Data/Metadata/portland_highway_metadata.csv")
portland_highway_numbers['highwayid'] = portland_highway_numbers['highwayid'].astype("str")

for fname in glob.glob("Data/Raw/Portland/*.csv"):
    print("Reading {}".format(fname))
    
    highway_id = fname.split("highway")[-1].split("_")[0]
    highway_name = portland_highway_numbers.query("highwayid==@highway_id")["highwayname"].iloc[0].replace("-", "")
    direction = portland_highway_numbers.query("highwayid==@highway_id")["direction"].iloc[0].capitalize()
    
    df, det = read_portland(fname, highway_name, min_date=min_date_15, max_date=max_date)
    
    fname_new = "Data/Intermediate/Portland/{}-{}_{}bound_2019_Intermediate.csv".format(highway_name, det, direction)
    
    df.to_csv(fname_new, index=False)
    
    c += 1

Reading Data/Raw/Portland/all_sensors_highway51_portland.csv
Reading Data/Raw/Portland/all_sensors_highway7_portland.csv
Reading Data/Raw/Portland/all_sensors_highway52_portland.csv
Reading Data/Raw/Portland/all_sensors_highway4_portland.csv
Reading Data/Raw/Portland/all_sensors_highway12_portland.csv
Reading Data/Raw/Portland/all_sensors_highway1_portland.csv
Reading Data/Raw/Portland/all_sensors_highway6_portland.csv
Reading Data/Raw/Portland/all_sensors_highway10_portland.csv
Reading Data/Raw/Portland/all_sensors_highway2_portland.csv
Reading Data/Raw/Portland/all_sensors_highway621_portland.csv
Reading Data/Raw/Portland/all_sensors_highway50_portland.csv
Reading Data/Raw/Portland/all_sensors_highway9_portland.csv
Reading Data/Raw/Portland/all_sensors_highway53_portland.csv
Reading Data/Raw/Portland/all_sensors_highway620_portland.csv
Reading Data/Raw/Portland/all_sensors_highway11_portland.csv
Reading Data/Raw/Portland/all_sensors_highway618_portland.csv
Reading Data/Raw/Portland/a

In [18]:
for path in glob.glob("Data/Raw/Utah/*"):
    print("Reading files in {} directory".format(path))
    
    highway_number = path.split("/")[-1].split("_")[0]
    detector_id = path.split("/")[-1].split("_")[1]
    direction = path.split("/")[-1].split("_")[2]
    
    df = read_utah(path, highway_number, detector_id, min_date=min_date_5, max_date=max_date)
    
    fname_new = "Data/Intermediate/Utah/{}-{}_{}_2019_Intermediate.csv".format(highway_number, detector_id, direction)
    
    df.to_csv(fname_new, index=False)
    
    c += 1

Reading files in Data/Raw/Utah/US189_260_Westbound directory
Reading files in Data/Raw/Utah/I15_3103178_Southbound directory
Reading files in Data/Raw/Utah/I215_134_Counterclockwise directory
Reading files in Data/Raw/Utah/I80_667_Westbound directory
Reading files in Data/Raw/Utah/US189_470_Eastbound directory
Reading files in Data/Raw/Utah/LegacyParkway_810_Northbound directory
Reading files in Data/Raw/Utah/US6_3103115_Westbound directory
Reading files in Data/Raw/Utah/US40_635_Eastbound directory
Reading files in Data/Raw/Utah/I84_482_Westbound directory
Reading files in Data/Raw/Utah/I70_3103401_Eastbound directory
Reading files in Data/Raw/Utah/I70_3103400_Westbound directory
Reading files in Data/Raw/Utah/US89_483_Northbound directory
Reading files in Data/Raw/Utah/I80_600_Eastbound directory
Reading files in Data/Raw/Utah/I215_31_Clockwise directory
Reading files in Data/Raw/Utah/US40_634_Westbound directory
Reading files in Data/Raw/Utah/I84_451_Eastbound directory
Reading file

In [19]:
c

76