# Preprocessing Data!

We load the following data sets:

• NYC infrastructure over all boroughs

• NYC collisions over many years

• citibike use data by month

• weather data over many years

We then build a dataloader so we can load these files in a batched manner. Each batch is a dataframe corresponding to a given month. The dataframe contains all the collisions (identified by day and road segment) and 100x as many non-collisions randomly sampled. Based on the day and road segment, we merge in weather and infrastructure data.

In [1]:
# Our abundant libraries :)
import pandas as pd
import geopandas as gpd
import networkx as nx
import numpy as np
import numpy.linalg as linalg
import matplotlib.pyplot as plt
import pickle
import momepy
import requests 
import zipfile
import os.path
from torch.utils.data import DataLoader, Dataset

We have a bunch of helper functions to help with the preprocessing process.

There are two problems that I do not want to put in the energy to fix:

• There are warnings when joining geopandas dataframes based on distance. I tried to fix them by making sure the geometry columns are all in the same format. However, this only made the code stop working. In particular, no locations survive the distance joins. Since it appears to work fine with the warnings, I'm not too worried.

• Citibike does this *amazing* thing where they change the names of the columns and, if they're feeling mischevious, remove some of the columns all together between months. In particular, sometimes they provide `tripduration` and other times they provide `started at` and `ended at` (or renamed versions of those columns). This makes checking whether a bike has been on a trip too long a problem. Since I think there are very few trips that are too long, I ignored this problem.

In [27]:
def preprocess_infrastructure(filename, filename_lpinv='not a file', restrict_bicycle=False, show_plot=False):
    # Load data
    infrastructure = gpd.read_file(filename)

    # All in Manhattan
    assert infrastructure['borocode'].unique().item() == '1'

    # Restrict to segments with in bicycle network
    # if restrict_bicycle:
    #     infrastructure = infrastructure[infrastructure['BIKE_LANE'].notna()] 
    ## Commenting this out for now so we can work with all segments

    graph = momepy.gdf_to_nx(infrastructure, approach="primal")

    for subgraph in nx.connected_components(graph):
        if len(subgraph) > 1000:
            graph = graph.subgraph(subgraph)

    _, infrastructure = momepy.nx_to_gdf(graph)

    graph = momepy.gdf_to_nx(infrastructure, approach="primal")
    _, infrastructure = momepy.nx_to_gdf(graph)

    if show_plot:
        positions = {n: [n[0], n[1]] for n in list(graph.nodes)}
        f, ax = plt.subplots(1, 1, figsize=(6, 10), sharex=True, sharey=True)
        nx.draw(graph, positions, ax=ax, node_size=5)
    
    if not os.path.isfile(filename_lpinv):
        nodes = list(graph.nodes())
        Laplacian = nx.laplacian_matrix(graph, nodelist=nodes)
        lpinv = linalg.pinv(Laplacian.todense()) # 4 min to run :(
        np.save(filename_lpinv, lpinv)

    Lpinv = np.matrix(np.load(filename_lpinv))
    nodes = list(graph.nodes())

    # Add ID column
    infrastructure['segment_ID'] = infrastructure.index

    return infrastructure, nodes, Lpinv

def preprocess_collisions(filename, filter, infrastructure):
    collisions = pd.read_csv(filename, low_memory=False).dropna(subset=['LATITUDE', 'LONGITUDE', 'CRASH DATE'])

    # Restrict to collisions involving bicycles
    markers = ['bike', 'bicyc', 'e - b', 'e-bik', 'e-unicycle', 'bk']
    mask = collisions['VEHICLE TYPE CODE 1'].str.contains('bike') # placeholder
    for i in [1,2,3,4,5]:
        for marker in markers:
            mask = mask | collisions[f'VEHICLE TYPE CODE {i}'].str.contains(marker, case=False)
    collisions = collisions.loc[mask]

    # Restrict to filter
    collisions = collisions[collisions.LONGITUDE != 0] # remove 0,0 coordinates
    collisions = gpd.GeoDataFrame(collisions, geometry=gpd.points_from_xy(collisions.LONGITUDE, collisions.LATITUDE))

    collisions = collisions.sjoin(filter)
    if 'index_right' in collisions.columns:
        collisions.drop(columns=['index_right'], inplace=True)

    # Add ID column
    collisions['collision_ID'] = collisions.index 

    
    # Connect collisions to infrastructure
    collisions = collisions.sjoin_nearest(infrastructure, max_distance=0.0001, how='inner')
    if 'index_right' in collisions.columns:
        collisions.drop(columns=['index_right'], inplace=True)
    if 'index_left' in collisions.columns:
        collisions.drop(columns=['index_left'], inplace=True)

    collisions.drop_duplicates(subset=['collision_ID'], keep='first', inplace=True)

    ## Let's start in 2016 since previous years of citibike have far fewer rides
    start_date = pd.Timestamp('2016-01-01')
    collisions['date'] = pd.to_datetime(collisions['CRASH DATE'])
    collisions = collisions.loc[collisions['date'] >= start_date]
    collisions['month'] = pd.DatetimeIndex(collisions['date']).month
    collisions['year'] = pd.DatetimeIndex(collisions['date']).year

    return collisions

def preprocess_filter(filename):
    filter = gpd.read_file(filename)
    filter = filter[filter['boro_name'] == 'Manhattan']
    return filter

def handle_bad_columns(citibike):
    # Inconsistent column names between different months
    citibike_different = {'start station longitude' : ['start_lng', 'Start Station Longitude'],
                          'start station latitude' : ['start_lat', 'Start Station Latitude'],
                          'end station longitude' : ['end_lng', 'End Station Longitude'],
                          'end station latitude' : ['end_lat', 'End Station Latitude'],
                          'starttime' : ['started_at', 'Start Time'],
                          'start station id' : ['start_station_id', 'Start Station ID'],
                          'end station id' : ['end_station_id', 'End Station ID'],
                          'stoptime': ['ended_at', 'Stop Time'],
                          }
    for variable_type in citibike_different:
        found = variable_type in citibike.columns
        for variable_alt in citibike_different[variable_type]:
            if variable_alt in citibike.columns:
                found = True
                citibike.rename(columns={variable_alt: variable_type}, inplace=True)
    if not found:
        print(citibike.columns)
    return citibike

def preprocess_citibike(year, month, filter):
    filename_citibike = f'data_unwrangled/citibike/{year}{month}-citibike-tripdata.csv'
    citibike = pd.read_csv(filename_citibike).dropna()
    citibike = handle_bad_columns(citibike)

    

    citibike = gpd.GeoDataFrame(citibike)
    # citibike['ride_ID'] = citibike.index # Removing this because not all months have it, and it's not actually a required variable

    for type in ['start', 'end']:
        citibike[f'{type}_geom'] = gpd.points_from_xy(citibike[f'{type} station longitude'], citibike[f'{type} station latitude'])
        citibike.set_geometry(f'{type}_geom', inplace=True)
        citibike = citibike.sjoin(filter)
        if 'index_right' in citibike.columns:
            citibike.drop(columns=['index_right'], inplace=True)
        citibike.drop_duplicates(subset=['ride_ID'], keep='first', inplace=True)
        

    citibike['starttime'] = pd.to_datetime(citibike['starttime'])
    citibike['starttime_rounded'] = citibike['starttime'].dt.floor('d')
    
    citibike['duration'] = ((pd.to_datetime(citibike['stoptime'])-pd.to_datetime(citibike['starttime'])).dt.total_seconds())/60 # Create a duration variable so we can filter erroneous trips
    citibike = citibike[citibike['duration']<24*60] # Eliminate any trip that takes more than 24 hours, since the bicycles are considered lost/stolen after 24 hours

    return citibike

def unique_stations(citibike, infrastructure, nodes):

    # Combine both start and end stations
    stations = {'start': {}, 'end': {}}
    for type in ['start', 'end']:
        renaming = {f'{type} station id': 'station_id', f'{type}_geom': 'geometry'}
        stations[type] = citibike.drop_duplicates(subset=[f'{type} station id'], keep='first').rename(columns=renaming)
        stations[type] = stations[type][renaming.values()]
    stations = pd.concat([stations['start'], stations['end']])
    
    # Remove duplicates
    stations.drop_duplicates(subset=['station_id'], keep='first', inplace=True)

    # Find nearby segments
    stations = stations.set_geometry('geometry')
    stations = stations.sjoin_nearest(infrastructure, max_distance=0.01, how='left')
    if 'index_right' in stations.columns:
        stations.drop(columns=['index_right'], inplace=True)
    stations.drop_duplicates(subset=[f'station_id'], keep='first', inplace=True)

    # Get corresponding node in graph
    node_points = np.array(nodes)
    node_geometry = gpd.GeoDataFrame(nodes, geometry=gpd.points_from_xy(node_points[:,0], node_points[:,1]))
    stations = stations.sjoin_nearest(node_geometry, max_distance=0.01, how='left').rename(columns={'index_right': 'node_index'})
    stations = stations[stations['node_index'].notna()]
    stations['node_index'] = stations['node_index'].astype(int)

    return stations

# edges correspond to road segments
# nodes correspond to intersections

def calculate_flow(start, end, station_to_node, nodes, Lpinv, infrastructure, saved):
    assert start != end
    start = station_to_node[start]
    end = station_to_node[end]
    key = (start, end) if start < end else (end, start)
    if key not in saved:
        Lpinv_vector = Lpinv[nodes.index(start)] - Lpinv[nodes.index(end)]
        resistance = Lpinv_vector[0, nodes.index(start)] - Lpinv_vector[0, nodes.index(end)]
        if resistance != 0:
            voltages = (Lpinv_vector/resistance).round(5)[0]

            def letitflow(node_start, node_end):
                return (voltages[node_start]-voltages[node_end]) ** 2

            vectorized = np.vectorize(lambda x, y : letitflow(x, y))

            saved[key] = vectorized(infrastructure.node_start, infrastructure.node_end)
        else: 
            saved[key] = np.zeros(len(infrastructure), dtype=np.float64)
    
    return saved[key], saved

def flow_on_month(citibike, infrastructure, station_to_node, nodes, Lpinv, filename_saved):
    days = citibike.starttime_rounded.unique()

    if os.path.exists(filename_saved):
        with open(filename_saved, 'rb') as pickle_file:
            saved = pickle.load(pickle_file)
    else:
        saved = {}

    month = {}
    for day in days:
        citibike_day = citibike[citibike['starttime_rounded']==day][['start station id', 'end station id']]
        grouped = citibike_day.groupby(['start station id', 'end station id']).size().reset_index(name='count')

        grouped = grouped[grouped['start station id'] != grouped['end station id']]

        total = np.zeros(len(infrastructure['segment_ID']), dtype=np.float64)
        for (start, end, count) in zip(grouped['start station id'], grouped['end station id'], grouped['count']):
            if start in station_to_node and end in station_to_node: # some strange station locations
                current, saved = calculate_flow(start, end, station_to_node, nodes, Lpinv, infrastructure, saved)
                total += count * current
        # Ensure each segment has at least some flow
        if np.sum(total != 0) > 0:
            minimum = total[total != 0].min()
            total[total == 0] = minimum
        day = pd.to_datetime(day)
        month[day] = total    
    with open(filename_saved, 'wb') as pickle_file:
        pickle.dump(saved, pickle_file)
        
    return month
    
# Wrap downloading
def download_zip(url, filename):
    if not os.path.isfile(filename):
        print('Downloading...')
        r = requests.get(url)
        with open(filename, 'wb') as f:
            f.write(r.content)
        print('Downloaded')
    else:
        print('Already downloaded')

def unzip(filename_zipped, directory, filename_unzipped):
    if not os.path.isfile(filename_unzipped):
        print('Unzipping...')
        with zipfile.ZipFile(filename_zipped, 'r') as zip_ref:
            zip_ref.extractall(directory)
        print('Unzipped')
    else:
        print('Already unzipped')

def download_citibike(year, month):
    if int(year) <= 2016:
        url = f"https://s3.amazonaws.com/tripdata/{year}{month}-citibike-tripdata.zip"
    else:
        url = f"https://s3.amazonaws.com/tripdata/{year}{month}-citibike-tripdata.csv.zip"
    save_path_zip = f"data_unwrangled/citibike/{year}{month}-citibike-tripdata.zip"
    save_path = f"data_unwrangled/citibike/{year}{month}-citibike-tripdata.csv"
    directory = 'data_unwrangled/citibike/'
    download_zip(url, save_path_zip)
    unzip(save_path_zip, directory, save_path)

def delete_citibike(year, month):
    save_path_zip = f"data_unwrangled/citibike/{year}{month}-citibike-tripdata.zip"
    save_path = f"data_unwrangled/citibike/{year}{month}-citibike-tripdata.csv"
    os.remove(save_path)
    os.remove(save_path_zip)

def get_monthly_flow(dataset, year, month):
    download_citibike(year, month)
    citibike = preprocess_citibike(year=year, month=month, filter=dataset.filter)
    stations = unique_stations(citibike, dataset.infrastructure, dataset.nodes)
    station_to_node = {station_id : dataset.nodes[index] for (station_id, index) in zip(stations.station_id, stations.node_index)}
    return flow_on_month(citibike, dataset.infrastructure, station_to_node, dataset.nodes, dataset.Lpinv, dataset.filenames['saved'])

def get_full_month(dataset, monthly_flow, year, month):
    # Select positives in month and year
    positives = dataset.collisions.loc[(dataset.collisions['year'] == int(year)) & (dataset.collisions['month'] == int(month))][['date', 'segment_ID']]
    positives['label'] = 1

    # Select random sample of negatives in month and year
    segments = dataset.infrastructure['segment_ID'].unique()
    dates = list(monthly_flow.keys())
    num_negative = dataset.ratio * len(positives) # Number of negatives to sample
    negatives = pd.DataFrame({'date': np.random.choice(dates, num_negative),
                              'segment_ID': np.random.choice(segments, num_negative),
                              'label': 0})

    observations = pd.concat([positives, negatives])
    observations.date = pd.to_datetime(observations.date)

    def get_flow(date, segment):
        return monthly_flow[pd.to_datetime(date)][segment]

    get_flow_vectorized = np.vectorize(get_flow)

    observations['flow'] = get_flow_vectorized(observations.date, observations.segment_ID)

    result = observations.merge(dataset.weather, on='date')
    result = result.merge(dataset.infrastructure, on='segment_ID')
    return result

The following code is formatted in a pytorch dataloader which is incredibly convenient. We load the persistent data once (infrastructure, collisions, weather). Then we load the months in batches. Processing all the citibike rides is very computationally intensive so this lets us only load the monthly rides as we need them.

In [4]:
filenames = {
    'infrastructure': 'data_unwrangled/centerline/20221017_Centerline-clipped.shp',
    'collisions': 'data_unwrangled/Motor_Vehicle_Collisions_-_Crashes.csv',
    'boundaries': 'data_unwrangled/2010 Neighborhood Tabulation Areas (NTAs).geojson',
    'weather': 'data_unwrangled/weather.csv',
    'saved': 'cache/flows.pickle',
    'lpinv': 'cache/lpinv.npy'
}

class BicycleDataset(Dataset):
    def __init__(self, filenames: dict, ratio: int):
        self.ratio = ratio
        self.filenames = filenames
        self.filter = preprocess_filter(filenames['boundaries'])
        self.infrastructure, self.nodes, self.Lpinv = preprocess_infrastructure(filenames['infrastructure'], filenames['lpinv'])
        self.collisions = preprocess_collisions(filenames['collisions'], self.filter, self.infrastructure)
        self.weather = pd.read_csv(filenames['weather'])
        self.weather['date'] = pd.to_datetime(self.weather.DATE)
        years = ['2016', '2017', '2018', '2019', '2020', '2021']
        months = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
        self.year_months = [(year, month) for year in years for month in months] + [('2022', month) for month in months[:5]]
    
    def __len__(self):
        return len(self.year_months)
    
    def __getitem__(self, idx):
        year, month = self.year_months[idx]
        monthly_flow = get_monthly_flow(self, year, month)
        return get_full_month(self, monthly_flow, year, month)

In [9]:
dataset = BicycleDataset(filenames, 100)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)


  gdf_network[length] = gdf_network.geometry.length

  gdf_network[length] = gdf_network.geometry.length
Use `to_crs()` to reproject one of the input geometries to match the CRS of the other.

Left CRS: None
Right CRS: EPSG:4326

  return geopandas.sjoin(left_df=self, right_df=df, *args, **kwargs)
Use `to_crs()` to reproject one of the input geometries to match the CRS of the other.

Left CRS: None
Right CRS: GEOGCS["WGS84(DD)",DATUM["WGS84",SPHEROID["WGS84", ...

  return geopandas.sjoin_nearest(

  self.weather = pd.read_csv(filenames['weather'])


In [26]:
dataset[10]

Downloading...
Downloaded
Unzipping...
Unzipped


Use `to_crs()` to reproject one of the input geometries to match the CRS of the other.

Left CRS: None
Right CRS: EPSG:4326

  return geopandas.sjoin(left_df=self, right_df=df, *args, **kwargs)
Use `to_crs()` to reproject one of the input geometries to match the CRS of the other.

Left CRS: None
Right CRS: EPSG:4326

  return geopandas.sjoin(left_df=self, right_df=df, *args, **kwargs)
Use `to_crs()` to reproject one of the input geometries to match the CRS of the other.

Left CRS: None
Right CRS: GEOGCS["WGS84(DD)",DATUM["WGS84",SPHEROID["WGS84", ...

  return geopandas.sjoin_nearest(



Unnamed: 0,date,label,flow,STATION,DATE,REPORT_TYPE,SOURCE,AWND,BackupDirection,BackupDistance,...,st_width,status,to_lvl_co,trafdir,Shape_Le_1,geometry,mm_len,node_start,node_end,segment_ID
