In [1]:
from dask.distributed import Client
from glob import glob
import dask.dataframe as dd
import json
import numpy as np
import pandas as pd
import os
import os.path

import geopandas
from shapely.geometry import Point

In [2]:
config = {
    "tlc_raw_data_path" : "../data/raw_data/tlc/",
    "uber_raw_data_path" : "../data/raw_data/uber/",
    
    "taxi_zones_shapefile": "../data/shapefiles/taxi_zones/taxi_zones.shp",
    "nyc_tract_shapefile": "../data/shapefiles/nyct2010_15b/nyct2010.shp",
    
    "parquet_output_path": "../data/processed_data/"
}

In [3]:
dtype_list = {
    'DOlocationID': np.float64,
    'dropoff_latitude': np.float64,
    'dropoff_longitude': np.float64,
    'dropoff_taxizone_id': np.float64,
    
    'locationID': np.float64,
    'PUlocationID': np.float64,
    'pickup_latitude': np.float64,
    'pickup_longitude': np.float64,
    'pickup_taxizone_id': np.float64,
    
    'ehail_fee': np.float64,
    'extra': np.float64,
    'fare_amount': np.float64,
    'improvement_surcharge': np.float64,
    'junk1': object,
    'junk2': object,
    'mta_tax': np.float64,
    'passenger_count': object,
    'payment_type': object,
    'rate_code_id': object,
    'store_and_fwd_flag': object,
    'tip_amount': np.float64,
    'tolls_amount': np.float64,
    'total_amount': np.float64,
    'trip_distance': np.float64,
    'trip_type': object,
    'vendor_id': object,
    
    'pickup_datetime': object,
    'dropoff_datetime': object,
}

In [4]:
# start Dask client

client = Client()

In [5]:
def glob(x):
    from glob import glob
    return sorted(glob(x))

def trymakedirs(path):
    try:
        os.makedirs(path)
    except:
        pass

In [6]:
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
    """
    Joins DataFrame with Taxi Zones shapefile.
    """

    localdf = df[[lon_var, lat_var, locid_var]].copy()
    localdf[lon_var] = localdf[lon_var].fillna(value=0.)
    localdf[lat_var] = localdf[lat_var].fillna(value=0.)
    localdf['replace_locid'] = (
        localdf[locid_var].isnull()
        & (localdf[lon_var] != 0.)
        & (localdf[lat_var] != 0.)
    )

    if (np.any(localdf['replace_locid'])):
        shape_df = geopandas.read_file(config['taxi_zones_shapefile'])
        shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
                      axis=1, inplace=True)
        shape_df = shape_df.to_crs({'init': 'epsg:4326'})

        try:
            local_gdf = geopandas.GeoDataFrame(
                localdf, crs={'init': 'epsg:4326'},
                geometry=[Point(xy) for xy in
                          zip(localdf[lon_var], localdf[lat_var])])

            local_gdf = geopandas.sjoin(
                local_gdf, shape_df, how='left', op='within')

            local_gdf = local_gdf[~local_gdf.index.duplicated(keep='first')]

            local_gdf.LocationID.values[~local_gdf.replace_locid] = (
                (local_gdf[locid_var])[~local_gdf.replace_locid]).values

            return local_gdf.LocationID.rename(locid_var)
        except ValueError as ve:
            print(ve)
            print(ve.stacktrace())
            return df[locid_var].astype(np.float64)
    else:
        return df[locid_var]

In [7]:
def assign_nyc_tracts(df, lon_var, lat_var, locid_var):
    """
    Joins DataFrame with NYC Tracts shapefile.
    """

    localdf = df[[lon_var, lat_var, locid_var]].copy()
    localdf[lon_var] = localdf[lon_var].fillna(value=0.)
    localdf[lat_var] = localdf[lat_var].fillna(value=0.)
    localdf['replace_locid'] = (
        localdf[locid_var].isnull()
        & (localdf[lon_var] != 0.)
        & (localdf[lat_var] != 0.)
    )

    if (np.any(localdf['replace_locid'])):
        shape_df = geopandas.read_file(config['nyc_tract_shapefile'])
        shape_df = shape_df[["BoroCT2010", "geometry"]]
        shape_df = shape_df.to_crs({'init': 'epsg:4326'})

        try:
            local_gdf = geopandas.GeoDataFrame(
                localdf, crs={'init': 'epsg:4326'},
                geometry=[Point(xy) for xy in
                          zip(localdf[lon_var], localdf[lat_var])])

            local_gdf = geopandas.sjoin(
                local_gdf, shape_df, how='left', op='within')

            local_gdf = local_gdf[~local_gdf.index.duplicated(keep='first')]

            local_gdf.BoroCT2010.values[~local_gdf.replace_locid] = (
                (local_gdf[locid_var])[~local_gdf.replace_locid]).values

            return local_gdf.BoroCT2010.rename(locid_var)
        except ValueError as ve:
            print(ve)
            print(ve.stacktrace())
            return df[locid_var].astype(np.float64)
    else:
        return df[locid_var]

In [8]:
def get_tlc_data(
    schemas,
    globs,
    trip_type
):  
    critical_columns = [
        'pickup_latitude', 'pickup_longitude', 'pickup_datetime', 'pickup_taxizone_id',
        'dropoff_latitude', 'dropoff_longitude', 'dropoff_datetime', 'dropoff_taxizone_id',
    ]
    
    df_merged = None
    for i in range(len(schemas)):
        glob = globs[i]
        schema = schemas[i]
        
        df = dd.read_csv(
            glob,
            names=schema.split(','),
            header=0,
            na_values=["NA"],
            dtype=dtype_list,
        )
        
        for column in critical_columns:
            if column not in df.columns:
                df[column] = df[df.columns[0]].copy()
                df[column] = np.nan
        
        df = df[critical_columns]
        
        for column in critical_columns:
            if column in df:
                df[column] = df[column].astype(dtype_list[column])
        
        if df_merged is None:
            df_merged = df
        else:
            df_merged = df_merged.append(df)

    for column in list(df_merged.columns):
        if column in dtype_list:
            df_merged[column] = df_merged[column].astype(dtype_list[column])

    df_merged['trip_type'] = trip_type

    return df_merged


In [9]:
def get_green():
    green_schema_pre_2015 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type,junk1,junk2"
    green_glob_pre_2015 = glob(
        os.path.join(config['tlc_raw_data_path'], 'green_tripdata_201[34]*.csv')
    )

    green_schema_2015_h1 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2"
    green_glob_2015_h1 = glob(
        os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2015-0[1-6].csv')
    )

    green_schema_2015_h2_2016_h1 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type"
    green_glob_2015_h2_2016_h1 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2015-0[7-9].csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2015-1[0-2].csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2016-0[1-6].csv'))

    green_schema_2016_h2 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_taxizone_id,dropoff_taxizone_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2"
    green_glob_2016_h2 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2016-0[7-9].csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2016-1[0-2].csv'))
    
    green_schema_2017_h1 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_taxizone_id,dropoff_taxizone_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type"
    green_glob_2017_h1 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2017-*.csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'green_tripdata_2018-*.csv'))

    schemas = [
        green_schema_pre_2015, green_schema_2015_h1, green_schema_2015_h2_2016_h1, green_schema_2016_h2, green_schema_2017_h1
    ]
    globs = [
        green_glob_pre_2015, green_glob_2015_h1, green_glob_2015_h2_2016_h1, green_glob_2016_h2, green_glob_2017_h1
    ]
    
    return get_tlc_data(
        schemas, globs, 'green'
    )


In [10]:
def get_fhv():
    fhv_schema_pre_2017 = "dispatching_base_num,pickup_datetime,pickup_taxizone_id"
    fhv_glob_pre_2017 = glob(
        os.path.join(config['tlc_raw_data_path'], 'fhv_tripdata_201[0-6]*.csv')
    )

    fhv_schema_2017_h1 = "dispatching_base_num,pickup_datetime,dropoff_datetime,pickup_taxizone_id,dropoff_taxizone_id"
    fhv_glob_2017_h1 = glob(
        os.path.join(config['tlc_raw_data_path'], 'fhv_tripdata_2017-0[1-6].csv')
    )

    fhv_schema_2017_h2 = "dispatching_base_num,pickup_datetime,dropoff_datetime,pickup_taxizone_id,dropoff_taxizone_id,shared_ride_flag"
    fhv_glob_2017_h2 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'fhv_tripdata_2017-0[7-9].csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'fhv_tripdata_2017-1*.csv'))

    fhv_schema_2018 = "pickup_datetime,dropoff_datetime,pickup_taxizone_id,dropoff_taxizone_id,shared_ride_flag,dispatching_base_num,dispatching_base_num2"
    fhv_glob_2018 = glob(
        os.path.join(config['tlc_raw_data_path'], 'fhv_tripdata_2018*.csv')
    )
    
    schemas = [
        fhv_schema_pre_2017, fhv_schema_2017_h1, fhv_schema_2017_h2, fhv_schema_2018
    ]
    globs = [
        fhv_glob_pre_2017, fhv_glob_2017_h1, fhv_glob_2017_h2, fhv_glob_2018
    ]
        
    return get_tlc_data(
        schemas, globs, 'fhv'
    )


In [11]:
def get_yellow():
    yellow_schema_pre_2015 = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,total_amount"
    yellow_glob_pre_2015 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_201[0-4]*.csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_2009*.csv'))

    yellow_schema_2015_2016_h1 = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount"
    yellow_glob_2015_2016_h1 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_2015*.csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_2016-0[1-6].csv'))

    yellow_schema_2016_h2 = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_taxizone_id,dropoff_taxizone_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,junk1,junk2"
    yellow_glob_2016_h2 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_2016-0[7-9].csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_2016-1[0-2].csv'))
    
    yellow_schema_2017_h1="vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_taxizone_id,dropoff_taxizone_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount"
    yellow_glob_2017_h1 = \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_2017-*.csv')) + \
        glob(os.path.join(config['tlc_raw_data_path'], 'yellow_tripdata_2018-*.csv'))
    
    schemas = [
         yellow_schema_pre_2015, yellow_schema_2015_2016_h1, yellow_schema_2016_h2, yellow_schema_2017_h1
    ]
    globs = [
         yellow_glob_pre_2015, yellow_glob_2015_2016_h1, yellow_glob_2016_h2, yellow_glob_2017_h1
    ]
        
    return get_tlc_data(
        schemas, globs, 'yellow'
    )


In [12]:
def get_uber():
    uber_schema_2014 = "pickup_datetime,pickup_latitude,pickup_longitude,junk1"
    uber_glob_2014 = glob(os.path.join(config['uber_raw_data_path'],'uber*-???14.csv'))
    
    uber_schema_2015 = "junk1,pickup_datetime,junk2,pickup_taxizone_id"
    uber_glob_2015 = glob(os.path.join(config['uber_raw_data_path'],'uber*15.csv'))
    
    schemas = [
         uber_schema_2014, uber_schema_2015
    ]
    globs = [
         uber_glob_2014, uber_glob_2015
    ]
        
    return get_tlc_data(
        schemas, globs, 'uber'
    )

In [13]:
# join all

uber = get_uber()
fhv = get_fhv()
green = get_green()
yellow = get_yellow()

all_trips = uber.append(fhv).append(green).append(yellow)

In [14]:
def sample(df, frac=0.02):
    from scipy import stats
    return stats.bernoulli.rvs(frac, size=df.shape[0])

all_trips["sample"] = all_trips.map_partitions(
    sample, meta=('sample', np.float64)
)
all_trips = all_trips[all_trips["sample"] == 1]

In [15]:
# map coordinates to taxi zones
all_trips['pickup_taxizone_id'] = all_trips.map_partitions(
    assign_taxi_zones,
    "pickup_longitude", "pickup_latitude", "pickup_taxizone_id",
    meta=('pickup_taxizone_id', np.float64)
)
all_trips['dropoff_taxizone_id'] = all_trips.map_partitions(
    assign_taxi_zones,
    "dropoff_longitude", "dropoff_latitude", "dropoff_taxizone_id",
    meta=('dropoff_taxizone_id', np.float64)
)

# map coordinates to nyc tracts
# all_trips['pickup_tract_id'] = all_trips['pickup_taxizone_id'].copy()
# all_trips['pickup_tract_id'] = np.nan
# all_trips['pickup_tract_id'] = all_trips.map_partitions(
#     assign_nyc_tracts,
#     "pickup_longitude", "pickup_latitude", "pickup_tract_id",
#     meta=('pickup_tract_id', np.float64)
# )
# all_trips['dropoff_tract_id'] = all_trips['pickup_taxizone_id'].copy()
# all_trips['dropoff_tract_id'] = np.nan
# all_trips['dropoff_tract_id'] = all_trips.map_partitions(
#     assign_nyc_tracts,
#     "dropoff_longitude", "dropoff_latitude", "dropoff_tract_id",
#     meta=('dropoff_tract_id', np.float64)
# )

In [16]:
all_trips = all_trips[[
    'pickup_datetime',
    'pickup_taxizone_id',
    'dropoff_datetime',
    'dropoff_taxizone_id',
    'trip_type'
]]

In [None]:
# Save to .parquet

all_trips.to_parquet(
    os.path.join(config['parquet_output_path'], '2perc_sample.parquet'),
    compression='GZIP',
    has_nulls=True,
    object_encoding='json'
)

In [None]:
# final size: ~500MB