In [1]:
import pandas as pd
import numpy as np

import geopandas as gpd
from shapely.geometry import Point
import rtree
import pickle

import matplotlib.pyplot as plt

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

In [17]:
import random

filename = "data/2017_Green_Taxi_Trip_Data.csv"
n = sum(1 for line in open(filename)) - 1 #number of records in file (excludes header)
s = 1000 #desired sample size
skip = sorted(random.sample(range(1,n+1),n-s)) #the 0-indexed header will not be included in the skip list
taxi_df = pd.read_csv(filename, skiprows=skip)

In [18]:
taxi_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 19 columns):
VendorID                 1000 non-null int64
lpep_pickup_datetime     1000 non-null object
lpep_dropoff_datetime    1000 non-null object
store_and_fwd_flag       1000 non-null object
RatecodeID               1000 non-null int64
PULocationID             1000 non-null int64
DOLocationID             1000 non-null int64
passenger_count          1000 non-null int64
trip_distance            1000 non-null float64
fare_amount              1000 non-null float64
extra                    1000 non-null float64
mta_tax                  1000 non-null float64
tip_amount               1000 non-null float64
tolls_amount             1000 non-null float64
ehail_fee                0 non-null float64
improvement_surcharge    1000 non-null float64
total_amount             1000 non-null float64
payment_type             1000 non-null int64
trip_type                1000 non-null int64
dtypes: float64(9), i

In [19]:
taxi_df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type
0,2,01/01/2017 12:28:33 AM,01/01/2017 12:40:27 AM,N,1,40,97,1,1.77,9.5,0.5,0.5,2.7,0.0,,0.3,13.5,1,1
1,1,01/01/2017 12:49:03 AM,01/01/2017 01:14:47 AM,N,1,256,26,1,9.6,29.5,0.5,0.5,0.0,0.0,,0.3,30.8,1,1
2,2,01/01/2017 02:22:29 AM,01/01/2017 02:47:58 AM,N,1,256,123,1,10.86,32.0,0.5,0.5,0.0,0.0,,0.3,33.3,2,1
3,1,01/01/2017 04:00:58 AM,01/01/2017 04:10:24 AM,N,1,97,25,1,1.5,8.5,0.5,0.5,2.45,0.0,,0.3,12.25,1,1
4,2,01/01/2017 09:10:16 AM,01/01/2017 09:25:23 AM,N,1,188,33,1,4.37,16.0,0.0,0.5,3.36,0.0,,0.3,22.11,1,1


In [20]:
taxi_df['lpep_pickup_datetime'] = pd.to_datetime(taxi_df.lpep_pickup_datetime)

In [21]:
taxi_df['lpep_dropoff_datetime'] = pd.to_datetime(taxi_df.lpep_dropoff_datetime)

In [22]:
taxi_df['date'] = taxi_df.lpep_pickup_datetime.dt.date

In [23]:
taxi_df['taxi_duration'] = ((taxi_df.lpep_dropoff_datetime - taxi_df.lpep_pickup_datetime).dt.total_seconds())//60

In [24]:
# taxi_df = taxi_df.rename(columns={'DOLocationID' : 'taxi_zone'})

In [25]:
# taxi_df['DOW'] = taxi_df.lpep_pickup_datetime.dt.dayofweek.map({0: 'Sunday', 1: "Monday", 2: "Tuesday", 3: "Wednesday", 4:"Thursday", 5:"Friday", 6:"Saturday"})
taxi_df['DOW'] = taxi_df.lpep_pickup_datetime.dt.weekday_name

In [26]:
taxi_df['TOD']= taxi_df.lpep_pickup_datetime.dt.hour

In [33]:
data = taxi_df.groupby(['DOW', 'TOD', 'PULocationID'])['VendorID'].count().reset_index()

In [34]:
data = data.rename(columns={'VendorID' : 'pickups'})

In [36]:
data.head()

Unnamed: 0,DOW,TOD,PULocationID,pickups
0,Friday,0,49,1
1,Friday,0,74,1
2,Friday,0,83,1
3,Friday,0,95,1
4,Friday,0,181,1


In [27]:
taxi_df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,date,taxi_duration,DOW,TOD
0,2,2017-01-01 00:28:33,2017-01-01 00:40:27,N,1,40,97,1,1.77,9.5,0.5,0.5,2.7,0.0,,0.3,13.5,1,1,2017-01-01,11.0,Sunday,0
1,1,2017-01-01 00:49:03,2017-01-01 01:14:47,N,1,256,26,1,9.6,29.5,0.5,0.5,0.0,0.0,,0.3,30.8,1,1,2017-01-01,25.0,Sunday,0
2,2,2017-01-01 02:22:29,2017-01-01 02:47:58,N,1,256,123,1,10.86,32.0,0.5,0.5,0.0,0.0,,0.3,33.3,2,1,2017-01-01,25.0,Sunday,2
3,1,2017-01-01 04:00:58,2017-01-01 04:10:24,N,1,97,25,1,1.5,8.5,0.5,0.5,2.45,0.0,,0.3,12.25,1,1,2017-01-01,9.0,Sunday,4
4,2,2017-01-01 09:10:16,2017-01-01 09:25:23,N,1,188,33,1,4.37,16.0,0.0,0.5,3.36,0.0,,0.3,22.11,1,1,2017-01-01,15.0,Sunday,9


In [None]:
import time
from datetime import date
import math

def date_extractor(date_str,b,minutes_per_bin):
    # Takes a datetime object as a parameter
    # and extracts and returns a tuple of the form: (as per the data specification)
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)
    # Split date string into list of date, time
    
    d = date_str.split()
    
    #safety check
    if len(d) != 2:
        return tuple([None,])
    
    # TIME (eg. for 16:56:20 and 15 mins per bin)
    #list of hour,min,sec (e.g. [16,56,20])
    time_list = [int(t) for t in d[1].split(':')]
    
    #safety check
    if len(time_list) != 3:
        return tuple([None,])
    
    # calculate number of minute into the day (eg. 1016)
    num_minutes = time_list[0] * 60 + time_list[1]
    
    # Time of the start of the bin
    time_bin = num_minutes / minutes_per_bin     # eg. 1005
    hour_bin = num_minutes / 60                  # eg. 16
    min_bin = (time_bin * minutes_per_bin) % 60  # eg. 45
    
    #get time_cat
    hour_str = str(hour_bin) if hour_bin / 10 > 0 else "0" + str(hour_bin)  # eg. "16"
    min_str = str(min_bin) if min_bin / 10 > 0 else "0" + str(min_bin)      # eg. "45"
    time_cat = hour_str + ":" + min_str                                     # eg. "16:45"
    
    # Get a floating point representation of the center of the time bin
    time_num = (hour_bin*60 + min_bin + minutes_per_bin / 2.0)/(60*24)      # eg. 0.7065972222222222
    
    time_cos = math.cos(time_num * 2 * math.pi)
    time_sin = math.sin(time_num * 2 * math.pi)
    
    # DATE
    # Parse year, month, day
    date_list = d[0].split('-')
    d_obj = date(int(date_list[0]),int(date_list[1]),int(date_list[2]))
    day_to_str = {0: "Monday",
                  1: "Tuesday",
                  2: "Wednesday",
                  3: "Thursday",
                  4: "Friday",
                  5: "Saturday",
                  6: "Sunday"}
    day_of_week = d_obj.weekday()
    day_cat = day_to_str[day_of_week]
    day_num = (day_of_week + time_num)/7.0
    day_cos = math.cos(day_num * 2 * math.pi)
    day_sin = math.sin(day_num * 2 * math.pi)
    
    year = d_obj.year
    month = d_obj.month
    day = d_obj.day
    
    weekend = 0
    #check if it is the weekend
    if day_of_week in [5,6]:
        weekend = 1
       
    return (year, month, day, time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)

In [None]:
def data_cleaner(zipped_row):
    # takes a tuple (row,g,b,minutes_per_bin) as a parameter and returns a tuple of the form:
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend,geohash)
    row = zipped_row[0]
    g = zipped_row[1]
    b = zipped_row[2]
    minutes_per_bin = zipped_row[3]
    # The indices of pickup datetime, longitude, and latitude respectively
    indices = (1, 6, 5)
    
    #safety check: make sure row has enough features
    if len(row) < 7:
        return None
    
    #extract day of the week and hour
    date_str = row[indices[0]]
    clean_date = date_extractor(date_str,b,minutes_per_bin)
    #get geo hash

    latitude = float(row[indices[1]])
    longitude = float(row[indices[2]])
    location = None
    #safety check: make sure latitude and longitude are valid
    if latitude < 41.1 and latitude > 40.5 and longitude < -73.6 and longitude > -74.1:
        location = geohash.encode(latitude,longitude, g)
    else:
        return None

    return tuple(list(clean_date)+[location])

In [None]:
gclean_rdd = taxi_df.map(lambda row: (row, g, b, minutes_per_bin)).map(data_cleaner).filter(lambda row: row != None).map(lambda row: (row,1)).reduceByKey(lambda a,b: a + b).map(lambda row: (row,'g'))