## Preparing $R_v$
The following functions read out the number of observations on a given edge in the road network and the respective speeds for every 15 minute time interval in the time windows 6:00-9:59 and 14:00-19:59.

In [2]:
# Imports, options
import multiprocessing as mp
import numpy as np
import os
import osmnx as ox
import pandas as pd

from config import rpath
os.chdir(rpath)

In [2]:
# Get Waypoint Files
path = "data/Berlin_2017/data/waypoints"
files = []
for file in os.listdir(path):
    if file.endswith("edges.csv"):
        files.append(os.path.join(path,file))
files.sort()
keepcols = ['TripID', 'WaypointSequence', 'CaptureDate','RawSpeed', 'edge_1', 'edge_2', 'edge_dist']

Note that the current state of the below functions reads the ".._small.csv" files and does not create them from scratch

In [3]:
def read_filter_index(file):
    '''This function reads a waypoint file, gets the CET timestamp and filters the data frame 
       to only include files from the peak hours and those who could be sensibly matched with 
       a road from the street network. It then adds an "edge_id" and a "time_id" to the data frame,
       which are unique edge identifiers and 15-minute time interval identifiers within the
       defined time windows, respectively.
    '''
    # Read file
    gdf = pd.read_csv(file)[keepcols] # note to self: dtypes are being read correctly
    
    # Convert CaptureDate to TimeStamp and convert to CET
    gdf["Timestamp_CET"] = pd.to_datetime(gdf["CaptureDate"], format='%Y-%m-%dT%H:%M:%S') 
    gdf["Timestamp_CET"] = pd.DatetimeIndex(gdf.Timestamp_CET).tz_convert('Europe/Berlin').to_series().tolist() 
    gdf = gdf.drop(columns="CaptureDate")
    
    # Get the edge distances to eventually create a histogram or a quantile overview
    edge_dist_list = gdf.edge_dist
    
    # Filter dataframe to only include peak hours during weekdays
    gdf = gdf[gdf.Timestamp_CET.dt.weekday < 5] # 5 = Saturday, 6 = Sunday
    gdf = gdf[((gdf.Timestamp_CET.dt.hour >= 6) & (gdf.Timestamp_CET.dt.hour < 10)) |
              ((gdf.Timestamp_CET.dt.hour >= 14) & (gdf.Timestamp_CET.dt.hour < 20))]
    
    
    # Drop observations that couldn't be matched to a street segment very well
    gdf = gdf[gdf["edge_dist"]<=50] 
    gdf = gdf.drop(columns="edge_dist")
    
    # Create new unique edge ID column
    gdf["edge_id"] = gdf[["edge_1","edge_2"]].astype(str).agg('-'.join, axis=1)
    
    # Add time identifier
    # Create time index for 15 minute blocks
    conditions = [
        ((gdf.Timestamp_CET.dt.hour == 6) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 6) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 6) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 6) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 7) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 7) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 7) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 7) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 8) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 8) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 8) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 8) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 9) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 9) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 9) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 9) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 14) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 14) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 14) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 14) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 15) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 15) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 15) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 15) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 16) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 16) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 16) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 16) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 17) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 17) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 17) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 17) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 18) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 18) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 18) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 18) & (gdf.Timestamp_CET.dt.minute >= 45)),
        ((gdf.Timestamp_CET.dt.hour == 19) & (gdf.Timestamp_CET.dt.minute < 15)),
        ((gdf.Timestamp_CET.dt.hour == 19) & (gdf.Timestamp_CET.dt.minute >= 15) & (gdf.Timestamp_CET.dt.minute < 30)),
        ((gdf.Timestamp_CET.dt.hour == 19) & (gdf.Timestamp_CET.dt.minute >= 30) & (gdf.Timestamp_CET.dt.minute < 45)),
        ((gdf.Timestamp_CET.dt.hour == 19) & (gdf.Timestamp_CET.dt.minute >= 45)),
    ]
    values = list(range(len(conditions))) # Corresponding condition values
    gdf['time_id'] = np.select(conditions, values) 
    
    return (gdf, edge_dist_list)

def get_Rv(file, verbose=True, use_files=True):
    '''Reads a file, applies filtering (see read_filter_index), saves the file, and
       computes Rv data frame for each segment and time id. Additionally returns a 
       list of edge_distances.
    '''
    pathfiles = [path + "/" + f for f in os.listdir(path)]
    
    if use_files:
        if file[:-4] + "_small.csv" in pathfiles:
            gdf = pd.read_csv(file[:-4] + "_small.csv")
            edge_dist_list = []
        else:
            print("Reading new file...")
            gdf, edge_dist_list = read_filter_index(file)
    else:
        gdf, edge_dist_list = read_filter_index(file)
        
    if verbose:
        print("A file was processed")
    
    # Save
    if not file[:-4] + "_small.csv" in os.listdir(path):
        outfile = file[:-4] + "_small.csv"
        gdf.to_csv(outfile, index=False)
        
    # Drop unnecessary columns
    gdf = gdf.drop(columns=["edge_1", "edge_2"])
    
    # Return Rv df
    return gdf[['edge_id', 'time_id', 'RawSpeed']].groupby(['edge_id', 'time_id']).agg(['count', 'sum']), edge_dist_list

def get_and_process_Rv(files, verbose=True):
    ''' Gets Rv for every file, processes it to get a clean Rv table for every edge and quarter of an hour.
        Prints out quantiles of edge distances and returns clean Rv data frame.
    '''
    # Start the multiprocessing pool and get Rv
    pool = mp.Pool(processes=min(len(files), 7))
    out = pool.map(get_Rv, files)
    pool.close()
    
    if verbose:
        print("Pool closed")

    Rv_list = [x[0] for x in out]
    edge_dist_list_list = [x[1] for x in out]
    
    del out # for space
    
    if verbose:
        print("Begin concatenating")
        
    Rv = pd.concat(Rv_list).groupby(['edge_id', 'time_id']).agg(['sum'])
    
    del Rv_list # for space
    
    # Remove entries that have n = 0 (only nan Speeds)
    Rv = Rv[Rv.xs(('RawSpeed','count', 'sum'), axis=1) != 0]
    
    # Remove obs with n < median n (20) on selected edge_id
    Rv = Rv[Rv.xs(('RawSpeed','count', 'sum'), axis=1) >= np.median(Rv.xs(("RawSpeed", "count", "sum"), axis=1))]
    print("The median count of observations on an edge is", np.median(Rv.xs(("RawSpeed", "count", "sum"), axis=1)))
    # Add meanSpeed column
    Rv.loc[:,"meanSpeed"] = Rv.xs(('RawSpeed','sum', 'sum'), axis=1)/Rv.xs(('RawSpeed','count', 'sum'), axis=1)
    
    if verbose:
        print("Begin merging lists")
        
    edge_dists = [item for sublist in edge_dist_list_list for item in sublist]
    edge_dists = np.array(edge_dists)
    print("# all waypoints: " + str(len(edge_dists)))
    print("# waypoints with dist >50: " + str(len(edge_dists[edge_dists<=50])))
    
    return Rv

In [None]:
Rv = get_and_process_Rv(files)

A file was processed
A file was processed
A file was processed
A file was processedA file was processed

A file was processed
A file was processed


Now, we have the mean speeds for each road segment and time. We save this for further usage.

In [11]:
Rv.to_csv("data/Rv.csv")