In [1]:
from dask.distributed import Client

client = Client(n_workers=6)


In [2]:
import csv
import os
import dask
import dask.dataframe as dd
import dask.array as da
import fastparquet
import pandas as pd
import altair as alt
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime

In [3]:
df = dd.read_csv(os.path.join('data', 'yellow_tripdata_2010-*.csv'),
                 parse_dates=['pickup_datetime','dropoff_datetime'],
                 quoting=csv.QUOTE_NONE, encoding='utf-8', error_bad_lines=False,
                 dtype={'trip_distance':'float64', 'store_and_fwd_flag':'object'})


In [4]:
# https://towardsdatascience.com/heres-how-to-calculate-distance-between-2-geolocations-in-python-93ecab5bbba4
def haversine_distance(row):
    
    lat1 = row['pickup_latitude']
    lon1 = row['pickup_longitude']
    lat2 = row['dropoff_latitude']
    lon2 = row['dropoff_longitude']
    
    # https://stackoverflow.com/questions/19252588/how-do-i-test-for-null-list-entry-in-python-list
    if not all(x for x in [lat1, lon1, lat2, lon2]):
        return row['trip_distance']
    
    if not all(isinstance(x, float) for x in [lat1, lon1, lat2, lon2]):
        return row['trip_distance']
    
    if len([*filter(lambda x: (x < 39.0) | (x > 42.0) , [lat1, lat2])]) > 0:
        return row['trip_distance']
    
    if len([*filter(lambda x: (x < -77.0) | (x > -70.0) , [lon1, lon2])]) > 0:
        return row['trip_distance']
    
    r = 6371
    phi1 = np.radians(lat1)
    phi2 = np.radians(lat2)
    delta_phi = np.radians(lat2 - lat1)
    delta_lambda = np.radians(lon2 - lon1)
    a = np.sin(delta_phi / 2)**2 + np.cos(phi1) * np.cos(phi2) *   np.sin(delta_lambda / 2)**2
    res = r * (2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)))    
    
    
    if (row['trip_distance'] < .15) | (row['trip_distance'] > 80):
        res = res
    else:
        res = row['trip_distance']   
    
    
    return np.round(res, 2)



In [5]:
def comp_dates(series, dt):
    if series < (dt + timedelta(days=-.25)):
        return 'Week before'
    elif series > (dt + timedelta(days=+.25)):
        return 'Week after'
    else:
        return 'Current week'
    
# https://stackoverflow.com/questions/34099684/how-to-use-groupby-transform-across-multiple-columns
def find_which_week(x):          
    b = x['b_ts'].iloc[0]
    # here a series is being passed to the comp_dates function through 'apply'
    x['which_week'] = x['pickup_datetime_1min'].apply(comp_dates, args=([b]))    
    return x


def make_same_dates(df_gb):         
    
    # here a series is being passed to the comp_dates function through 'apply'
    df_gb['pickup_datetime_same_time'] = df_gb.apply(
        lambda row: row['pickup_datetime_1min']+ timedelta(days=+7) if row['which_week'] == 'Week before' else \
        row['pickup_datetime_1min']+ timedelta(days=-7) if row['which_week'] == 'Week after' else \
        row['pickup_datetime_1min'], axis=1
    )   
    return df_gb

In [6]:
def make_trip_distance_bins(row):
    
    dist = row['comp_trip_distance']
    
    if dist == 0:
        res = 0
    elif dist <= .25:
        res = .25
    elif dist <= .5:
        res = .5
    elif dist <= 1:
        res = 1
    elif dist <= 2:
        res = 2
    elif dist <= 4:
        res = 4
    elif dist <= 8:
        res = 8
    elif dist <= 16:
        res = 16
    elif dist <= 32:
        res = 32
    else:
        res = 33
    
    
    return res

In [39]:
#df.isnull().sum().compute()

vendor_id                    0
pickup_datetime              0
dropoff_datetime             0
passenger_count              0
trip_distance                0
pickup_longitude             0
pickup_latitude              0
rate_code                    0
store_and_fwd_flag    88387448
dropoff_longitude          110
dropoff_latitude           110
payment_type                 0
fare_amount                  0
surcharge                    0
mta_tax                      0
tip_amount                   0
tolls_amount                 0
total_amount                 0
dtype: int64

In [6]:
#df.dtypes

vendor_id                     object
pickup_datetime       datetime64[ns]
dropoff_datetime      datetime64[ns]
passenger_count                int64
trip_distance                float64
pickup_longitude             float64
pickup_latitude              float64
rate_code                      int64
store_and_fwd_flag            object
dropoff_longitude            float64
dropoff_latitude             float64
payment_type                  object
fare_amount                  float64
surcharge                    float64
mta_tax                      float64
tip_amount                   float64
tolls_amount                 float64
total_amount                 float64
dtype: object

In [7]:
from datetime import datetime

start_time  = datetime.now()
print("Starting: " + str(start_time))



orig_boundary_list = ['2010-01-17T11:51:00',
                     '2010-01-24T15:05:00',
                     '2010-01-25T06:01:00',
                     '2010-01-25T16:51:00',
                      '2010-01-26T00:51:00',
                      '2010-02-23T06:51:00',
                      '2010-02-25T02:51:00',
                      '2010-03-12T07:51:00',
                      '2010-03-12T11:51:00',
                      '2010-03-14T09:26:00',
                      '2010-03-15T00:28:00',
                      '2010-03-22T05:00:00',
                      '2010-03-23T14:16:00',
                      '2010-03-25T22:51:00',
                      '2010-03-28T19:51:00',
                      '2010-04-09T00:51:00',
                      '2010-04-16T18:51:00',
                      '2010-04-24T23:51:00',
                      '2010-04-26T08:51:00',
                      '2010-04-27T10:51:00',
                      '2010-05-02T23:51:00',
                      '2010-05-03T13:51:00',
                      '2010-05-08T08:15:00',
                      '2010-05-11T22:51:00',
                      '2010-05-12T06:51:00',
                      '2010-05-14T09:32:00',
                      '2010-05-18T03:51:00',
                      '2010-05-24T12:51:00',
                      '2010-06-09T09:51:00',
                      '2010-06-10T12:51:00',
                      '2010-06-13T13:51:00',
                      '2010-06-16T23:51:00',
                      '2010-06-22T17:51:00',
                      '2010-07-13T11:13:00',
                      '2010-07-14T08:33:00',
                      '2010-07-14T22:49:00',
                      '2010-07-19T08:05:00',
                      '2010-07-23T09:51:00',
                      '2010-07-23T19:41:00',
                      '2010-07-25T13:51:00',
                      '2010-07-29T05:51:00',
                      '2010-07-29T11:51:00',
                      '2010-08-12T08:06:00',
                      '2010-08-15T13:51:00',
                      '2010-08-16T01:51:00',
                      '2010-08-16T16:51:00',
                      '2010-08-22T12:51:00',
                      '2010-08-23T03:59:00',
                      '2010-08-24T21:51:00',
                      '2010-09-12T13:51:00',
                      '2010-09-13T16:51:00',
                      '2010-09-16T15:51:00',
                      '2010-09-22T18:53:00',
                      '2010-09-27T06:21:00',
                      '2010-09-28T10:00:00',
                      '2010-09-30T03:51:00',
                      '2010-10-01T01:53:00',
                      '2010-10-01T13:53:00',
                      '2010-10-04T02:51:00',
                      '2010-10-05T22:51:00',
                      '2010-10-11T17:53:00',
                      '2010-10-14T14:53:00',
                      '2010-10-26T22:53:00',
                      '2010-10-27T13:51:00',
                      '2010-10-27T19:25:00',
                      '2010-10-27T23:51:00',
                      '2010-11-04T01:53:00',
                      '2010-11-07T12:51:00',
                      '2010-11-10T08:51:00',
                      '2010-11-15T18:36:00',
                      '2010-11-16T03:48:00',
                      '2010-11-16T20:06:00',
                      '2010-11-25T12:53:00',
                      '2010-11-26T00:13:00',
                      '2010-11-30T11:53:00',
                      '2010-12-01T01:47:00',
                      '2010-12-01T20:53:00',
                      '2010-12-12T00:53:00',
                      '2010-12-12T16:00:00',
                      '2010-12-26T09:51:00',                  
                      #'2010-3-14T05:00:00',
                      #'2010-11-07T05:00:00', 
                     ]

pot_boundary_list = [datetime.strptime(str_date, '%Y-%m-%dT%H:%M:%S') for str_date in orig_boundary_list]

pot_boundary_list_dst = [x + timedelta(hours=+1) if (datetime(2010, 3, 14, 2, 0, 0) <= x < datetime(2010, 11, 7, 1, 0, 0)) else x for x in pot_boundary_list]


ddf_list = []

for idx, b in enumerate(pot_boundary_list_dst):
    
    
    
    days_plus_minus = .25
    
    week_before_start = (b + timedelta(days=-days_plus_minus-7))
    week_before_end = (b + timedelta(days=days_plus_minus-7))
    current_week_start = (b + timedelta(days=-days_plus_minus))
    current_week_end = (b + timedelta(days=days_plus_minus))
    week_after_start = (b + timedelta(days=-days_plus_minus+7))
    week_after_end = (b + timedelta(days=days_plus_minus+7)) 
    
    df_d = df[ \
          ((df['pickup_datetime'] >= week_before_start) & (df['pickup_datetime'] < week_before_end)) \
          | ((df['pickup_datetime'] >= current_week_start) & (df['pickup_datetime'] < current_week_end)) \
          | ((df['pickup_datetime'] >= week_after_start) & (df['pickup_datetime'] < week_after_end))         
         ]
    
    df_d['b_id'] = idx
    df_d['b_ts'] = b
    
    


    ddf_list.append(df_d)    

    
# https://sourcecodequery.com/example-method/dask.concat
df_d = dd.concat(ddf_list)

df_d['comp_trip_distance'] = df_d.apply(lambda row: haversine_distance(row), axis=1, meta=(None, 'float64'))

df_d['comp_dist_bins'] = df_d.apply(lambda row: make_trip_distance_bins(row), axis=1, meta=(None, 'float64'))


df_d['pickup_datetime_1min'] = df_d['pickup_datetime'].dt.round('1min')


df_3w = df_d.groupby(['b_id', 'b_ts', 'pickup_datetime_1min', 'comp_dist_bins'])['vendor_id'].count().reset_index().compute()#.apply(lambda x: x.value_counts(), meta=pd.Series(dtype='int', name='vendor_id')).compute()
df_3w.columns = ['b_id', 'b_ts', 'pickup_datetime_1min', 'dist_bin', 'rides_per_minute']

df_3w = df_3w.groupby(['b_id']).apply(find_which_week)

# remove first and last entry per week in attached to a boundary_id
# because rounding causes each extreme timestamp to be undercounted
df_3w = df_3w.groupby(['b_id', 'which_week'], as_index=False).apply(lambda group: group.iloc[1:-1, :])



df_3w = df_3w.sort_values(by=['b_id', 'pickup_datetime_1min']).reset_index(drop=True)   

df_3w['sequence']=df_3w.groupby(['b_id', 'which_week']).cumcount()+1

df_3w = df_3w.groupby('b_id').apply(make_same_dates)



print(datetime.now())

Starting: 2021-05-12 16:44:27.673278
2021-05-12 20:52:10.932664


In [None]:
df_3w

In [8]:
df_3w.to_csv('data/3wboundaries-weather-dst-adj-bins.csv', index=False )