In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import geopandas as gpd

from shapely.geometry import Point
import dask.dataframe as dd
import dask.distributed
from dask.distributed import Client

# Importing data

In [2]:
clean_fare = pd.read_csv("data/trip_fare_4.csv")

In [3]:
clean_data = pd.read_csv("data/trip_data_4.csv")

  exec(code_obj, self.user_global_ns, self.user_ns)


The files were too big for pandas to automatically assign the best data types, will need to do that manually.

# Initial data inspection and cleaning

## Checking the variable names and dtypes

In [4]:
clean_fare.columns

Index(['medallion', ' hack_license', ' vendor_id', ' pickup_datetime',
       ' payment_type', ' fare_amount', ' surcharge', ' mta_tax',
       ' tip_amount', ' tolls_amount', ' total_amount'],
      dtype='object')

Visual inspection of the column names shows there is unnecessary whitespace.

In [5]:
# removing the whitespace in column names
clean_fare.columns = clean_fare.columns.str.replace(" ", "")
clean_data.columns = clean_data.columns.str.replace(" ", "")

In [6]:
# Change columns to a category
clean_fare["medallion"] = clean_fare["medallion"].astype("category")
clean_fare["hack_license"] = clean_fare["hack_license"].astype("category")

clean_fare.vendor_id.unique() # Only 2 types "CMT" and "VTF"
clean_fare["vendor_id"] = clean_fare["vendor_id"].astype("category")

clean_fare.payment_type.unique() # 'CRD', 'CSH', 'UNK', 'NOC', 'DIS'
clean_fare["payment_type"] = clean_fare["payment_type"].astype("category")

# Change type to datetime 
clean_fare["pickup_datetime"] = clean_fare["pickup_datetime"].astype("datetime64")

In [7]:
# Change columns to a category
clean_data["medallion"] = clean_data["medallion"].astype("category")
clean_data["hack_license"] = clean_data["hack_license"].astype("category")

clean_data.vendor_id.unique() # Only 2 types "CMT" and "VTF"
clean_data["vendor_id"] = clean_data["vendor_id"].astype("category")

clean_data.rate_code.unique() # 0-9 and 65, 77, 206, 208, 210
clean_data["rate_code"] = clean_data["rate_code"].astype("category")

clean_data.store_and_fwd_flag.unique() # Y, N, NA
clean_data["store_and_fwd_flag"] = clean_data["store_and_fwd_flag"].astype("category")

# Change type to datetime 
clean_data["pickup_datetime"] = clean_data["pickup_datetime"].astype("datetime64")
clean_data["dropoff_datetime"] = clean_data["dropoff_datetime"].astype("datetime64")

In [8]:
def downcaster(df):
    """
    Checks the dtype of each numerical variable and downcasts to the lowest 
    memory usage datatype possible
    
    param: pandas.core.frame.DataFrame
    
    returns: the downcasted dataframe
    """
    for column in df.columns:
        if df[column].dtype == "int":
            downcast_type = "integer"
        elif df[column].dtype == "float":
            downcast_type = "float"
        else:
            continue
        df[column] = pd.to_numeric(df[column], 
                                   errors='ignore', 
                                   downcast=downcast_type)
    return df

In [9]:
# Downcasting to make things less memory intensive
clean_data = downcaster(clean_data)
clean_fare = downcaster(clean_fare)

Saved a few gb of memory!

## Missing Values

In [10]:
clean_fare.isna().sum()

medallion          0
hack_license       0
vendor_id          0
pickup_datetime    0
payment_type       0
fare_amount        0
surcharge          0
mta_tax            0
tip_amount         0
tolls_amount       0
total_amount       0
dtype: int64

In [11]:
clean_data.isna().sum()

medallion                   0
hack_license                0
vendor_id                   0
rate_code                   0
store_and_fwd_flag    7518657
pickup_datetime             0
dropoff_datetime            0
passenger_count             0
trip_time_in_secs           0
trip_distance               0
pickup_longitude            0
pickup_latitude             0
dropoff_longitude         146
dropoff_latitude          146
dtype: int64

Only missing values are relating to coordinates and store and fwd flag (this relates to when the fare system is down and the taxi driver needs to store the fare and upload it later, assuming NA means the system was functioning).

## Summary Statistics

In [12]:
clean_fare.describe().apply(lambda s: s.apply('{0:.2f}'.format))

Unnamed: 0,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
count,15100468.0,15100468.0,15100468.0,15100468.0,15100468.0,15100468.0
mean,12.27,0.33,0.5,1.35,0.24,14.69
std,9.96,0.37,0.03,2.13,1.19,11.94
min,2.5,0.0,0.0,0.0,0.0,2.5
25%,6.5,0.0,0.5,0.0,0.0,8.0
50%,9.5,0.0,0.5,1.0,0.0,11.0
75%,14.0,0.5,0.5,2.0,0.0,16.5
max,500.0,15.0,0.5,200.0,20.0,628.1


In [13]:
clean_data.describe().apply(lambda s: s.apply('{0:.2f}'.format))

Unnamed: 0,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
count,15100468.0,15100468.0,15100468.0,15100468.0,15100468.0,15100322.0,15100322.0
mean,1.71,746.61,2.86,-72.73,40.07,-72.69,40.05
std,1.39,550.44,3.34,9.73,6.96,9.86,6.98
min,0.0,0.0,0.0,-2323.42,-3481.14,-2771.29,-3547.9
25%,1.0,360.0,1.04,-73.99,40.74,-73.99,40.73
50%,1.0,600.0,1.78,-73.98,40.75,-73.98,40.75
75%,2.0,960.0,3.2,-73.97,40.77,-73.96,40.77
max,9.0,10800.0,100.0,2228.72,3210.39,2228.75,3577.13


**Passenger count**
0 passengers? Error or not a trip? Payed to wait?

**Trip time**
min is 0, so... no trip?

**Trip distance**
min is 0, no trip?

**Coords**
Need to investigate why the min and max coords are so large, standard coords are between -90 to 90 for latitude and -180 to 180 for longitude. Is this a dif coord system or do we need to remove rows?

In [14]:
# Looking for the number of rows with out of bounds coordinates
dropoff_longitude_bounds = ((clean_data["dropoff_longitude"] < -180) | 
                            (clean_data["dropoff_longitude"] > 180))
dropoff_latitude_bounds = ((clean_data["dropoff_latitude"] < -90) | 
                           (clean_data["dropoff_latitude"] > 90))
pickup_longitude_bounds = ((clean_data["pickup_longitude"] < -180) | 
                           (clean_data["pickup_longitude"] > 180))
pickup_latitude_bounds = ((clean_data["pickup_latitude"] < -90) | 
                          (clean_data["pickup_latitude"] > 90))
(clean_data[pickup_longitude_bounds | pickup_latitude_bounds].shape[0],
 clean_data[dropoff_longitude_bounds | dropoff_latitude_bounds].shape[0])

(91, 90)

In [15]:
# Looking for the number of rows with no passengers
no_passengers = (clean_data["passenger_count"] == 0)
clean_data[no_passengers].shape[0]

229

In [16]:
# Looking for the number of rows with no trip time
no_time = (clean_data["trip_time_in_secs"] == 0)
clean_data[no_time].shape[0]

38065

In [17]:
# Looking for the number of rows with no trip distance
no_distance = (clean_data["trip_distance"] == 0)
clean_data[no_distance].shape[0]

112267

## Saving for later use

In [18]:
# Saving dataframes as a python object to use in another notebook
clean_fare.to_pickle('data/clean_fare.pickle')
clean_data.to_pickle('data/clean_data.pickle')

## Labeling the pickup and dropoff zones

In [19]:
# This creates a dashboard to view the progress of dask computations
client = Client()

In [20]:
def assign_zone(df, longitude, latitude, location_id):
    """Used in conjunction with map_partition() to map a set of coordinates
    (longitutde and latitude) to a zone id (number relating to a place in manhatten).
    
    params:
    #TODO
    
    returns:
    #TODO
    """
    local_df = df[[longitude, latitude]].copy()
    local_gdf = gpd.GeoDataFrame(local_df, 
                                 crs=4326,
                                 geometry=[Point(xy) for xy in 
                                           zip(local_df[longitude], 
                                               local_df[latitude])])
    zones = gpd.read_file("data/shapefile/taxi_zones.shp")
    zones = zones[['LocationID', 'geometry']]
    zones = zones.to_crs(crs=4326)
    local_gdf = gpd.sjoin(local_gdf, 
                                zones, 
                                how='left',
                                predicate='within')
    
    return local_gdf.LocationID.rename(location_id)

In [21]:
# loading data and transforming to new coord system
# Will be used to match long, lat coordinates to zone names
taxi_zones = gpd.read_file("data/shapefile/taxi_zones.shp")
taxi_zones['zone'] = taxi_zones.zone.astype('category')
taxi_zones['borough'] = taxi_zones.borough.astype('category')
taxi_zones = taxi_zones.to_crs(crs=4326)
taxi_zones = taxi_zones[["zone","borough", "LocationID"]]
taxi_zones["LocationID"] = taxi_zones["LocationID"].astype("int16")

In [22]:
# Dropping all data outside of manhatten (-74.26, 40.47, -71.8, 41.3)
pickup_longitude_bounds = ((clean_data["pickup_longitude"] >= -74.26) & 
                           (clean_data["pickup_longitude"] <= -71.8))
pickup_latitude_bounds = ((clean_data["pickup_latitude"] >= 40.47) & 
                          (clean_data["pickup_latitude"] <= 41.3))

dropoff_longitude_bounds = ((clean_data["dropoff_longitude"] >= -74.26) & 
                            (clean_data["dropoff_longitude"] <= -71.8))
dropoff_latitude_bounds = ((clean_data["dropoff_latitude"] >= 40.47) & 
                           (clean_data["dropoff_latitude"] <= 41.3))

clean_data = dd.from_pandas(clean_data[dropoff_longitude_bounds & 
                                       dropoff_latitude_bounds &
                                       pickup_longitude_bounds &
                                       pickup_latitude_bounds],
                           npartitions=16) 

In [23]:
"""
map_partition is a dask dataframe method that partitions the dataframe in to a number
of smaller pandas dataframes and applies functions to them one after the other. 
It is lazy, which means it never loads or shows data until it is needed for computation
this means we only use memory as it is needed. Great for doing computationally intense 
operations on large datasets.
"""

clean_data['pickup_id'] = clean_data.map_partitions(
    assign_zone, 
    longitude = "pickup_longitude", 
    latitude = "pickup_latitude", 
    location_id = "pickup_id", meta=('pickup_id',np.int16))

clean_data['dropoff_id'] = clean_data.map_partitions(
    assign_zone, 
    longitude = "dropoff_longitude", 
    latitude = "dropoff_latitude", 
    location_id = "dropoff_id", meta=('dropoff_id',np.int16))

In [24]:
"""
For some reason using dask to write the map_partition results to parquet (a file format 
used by apache) and then reading it back from a file to a pandas dataframe is (much) 
faster than transforming a dask dataframe to a pandas dataframe. This could be due to:
1. The fact I'm not a dask expert and I'm doing something wrong.
2. It's easier to compute in memory and write to disk then compute in memory and 
store in memory. If I had to do this more often I'd do some reading to see if there 
was something I could do to speed up transforming dask to pandas
"""

compute_dask = False

if compute_dask:
    clean_data.to_parquet('data/trips.parquet')

clean_data = pd.read_parquet('data/trips.parquet')

In [25]:
"""
assign_zone() outputs the pickup zones as floats, to lower the ram usage and get rid
of the decimal we need to fill the nans with a negative number (all the zone ids are
positive) and then change to an int16, after this we can drop any row that has 
coordinates which assign_zone() output nan. Given more time I'd investigate why nans
were output but its simpler just to ignore them given the amount of data left over.
"""

clean_data["pickup_id"] = clean_data["pickup_id"].fillna(-1)
clean_data["dropoff_id"] = clean_data["dropoff_id"].fillna(-1)
clean_data["pickup_id"] = clean_data["pickup_id"].astype("int16")
clean_data["dropoff_id"] = clean_data["dropoff_id"].astype("int16")
clean_data = clean_data[(clean_data["pickup_id"] > 0) &
                       (clean_data["dropoff_id"] > 0)]

## Merge the data and fare datasets

In [26]:
"""

"""
clean_data = clean_data.merge(taxi_zones
                              .rename(columns={"LocationID":"pickup_id",
                                              "zone":"pickup_zone",
                                              "borough":"pickup_borough"}),
                              on="pickup_id",
                              how="left")
clean_data = clean_data.merge(taxi_zones
                              .rename(columns={"LocationID":"dropoff_id",
                                              "zone":"dropoff_zone",
                                              "borough":"dropoff_borough"}),
                              on="dropoff_id",
                              how="left")
for column in ["pickup_zone", "pickup_borough", "dropoff_zone", "dropoff_borough"]:
    clean_data[column] = clean_data[column].astype("category")

In [27]:
clean_data["trip"] = (clean_data["pickup_zone"]
                      .astype("string")
                      .str
                      .cat(clean_data["dropoff_zone"]
                           .astype("string"), 
                           sep=" to "))
clean_data["trip"] = clean_data["trip"].astype("category")

In [30]:
join_variables = (set(clean_data
                      .columns
                      .values)
                  .intersection(set(clean_fare
                                    .columns
                                    .values)))
merged_data = clean_data.merge(clean_fare,
                                   on=list(join_variables))

In [33]:
merged_data.head()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,...,dropoff_zone,dropoff_borough,trip,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,91F6EB84975BBC867E32CB113C7C2CD5,AD8751110E6292079EB10EB9481FE1A6,CMT,1,N,2013-04-04 18:47:45,2013-04-04 19:00:25,1,759,2.5,...,Upper West Side South,Manhattan,Lenox Hill West to Upper West Side South,CRD,11.0,1.0,0.5,2.5,0.0,15.0
1,C1B9DA774DC2BBC6DE27CE994E7F44A0,E1B595FD55E4C82C1E213EB17438107A,CMT,1,N,2013-04-04 17:59:50,2013-04-04 18:21:48,1,1318,3.6,...,TriBeCa/Civic Center,Manhattan,Midtown Center to TriBeCa/Civic Center,CRD,16.5,1.0,0.5,3.6,0.0,21.6
2,9BA84250355AB3FC031C9252D395BF8A,16BB0D96A0DCC853AEC7F55C8D6C71E0,CMT,1,N,2013-04-04 18:12:01,2013-04-04 18:25:24,1,799,1.9,...,Upper East Side North,Manhattan,Midtown North to Upper East Side North,CRD,10.0,1.0,0.5,3.45,0.0,14.95
3,205A696DF62AD03C88DA8C5EC5248639,579C41EA5EC846F8B641A42F9EE3E855,CMT,1,N,2013-04-04 20:12:57,2013-04-04 20:29:55,1,1017,3.6,...,Lenox Hill West,Manhattan,West Chelsea/Hudson Yards to Lenox Hill West,CRD,15.0,0.5,0.5,3.2,0.0,19.200001
4,EE75E5927D00739AC342810C336A825E,1B4E92431F9DA4D49874EC76E769E874,CMT,1,N,2013-04-05 02:48:11,2013-04-05 02:51:21,2,189,0.7,...,Penn Station/Madison Sq West,Manhattan,Garment District to Penn Station/Madison Sq West,CRD,4.5,0.5,0.5,1.1,0.0,6.6


In [35]:
merged_data.to_pickle('data/merged_data.pickle')