In [3]:
import os
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

In [7]:
def convert_dtype(x):
    if not x:
        return ''
    try:
        return str(x)   
    except:        
        return ''

In [9]:
filename = os.path.join('../data', 'yellow_tripdata_2021-**.csv')
schema = {'trip_distance': 'float64', 'PULocationID': 'uint16', 'DOLocationID': 'uint16', 'store_and_fwd_flag': 'str', 
          'fare_amount': 'float64', 'extra': 'float64', 'mta_tax': 'float64', 'tip_amount': 'float64', 'tolls_amount': 'float64',
          'improvement_surcharge': 'float64', 'total_amount': 'float64', 'congestion_surcharge': 'float64', 'RatecodeID': 'uint8',
          'VendorID': 'uint8', 'passenger_count': 'uint8', 'payment_type': 'uint8'}
date_fields = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']

In [7]:
df = dd.read_csv(filename, 
                 dtype={'RatecodeID': 'float64','VendorID': 'float64', 'passenger_count': 'float64', 'payment_type': 'float64'},
                 converters = {'store_and_fwd_flag': convert_dtype}, parse_dates=date_fields)

In [8]:
df.dtypes

VendorID                        float64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                    float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [9]:
df.sample(frac=0.00001).head(5)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
520719,1.0,2021-01-14 12:14:52,2021-01-14 12:16:52,1.0,0.2,1.0,N,75,75,2.0,3.5,0.0,0.5,0.0,0.0,0.3,4.3,0.0
287612,1.0,2021-01-08 17:46:55,2021-01-08 17:53:28,1.0,0.8,1.0,N,239,142,1.0,6.0,3.5,0.5,2.0,0.0,0.3,12.3,2.5
236741,2.0,2021-01-07 15:46:55,2021-01-07 15:49:24,4.0,0.82,1.0,N,229,141,1.0,4.5,0.0,0.5,1.56,0.0,0.3,9.36,2.5
255710,2.0,2021-01-07 23:52:54,2021-01-08 00:09:07,1.0,9.72,1.0,N,70,79,1.0,27.5,0.5,0.5,3.08,0.0,0.3,34.38,2.5
685273,2.0,2021-01-18 14:55:09,2021-01-18 15:02:49,1.0,1.41,1.0,N,230,233,2.0,7.0,0.0,0.5,0.0,0.0,0.3,10.3,2.5


In [10]:
pbar = ProgressBar()
pbar.register()

In [8]:
len(df)

[########################################] | 100% Completed | 29.7s


15000700

In [9]:
df.isnull().sum().compute()

[########################################] | 100% Completed | 30.2s


VendorID                 834028
tpep_pickup_datetime          0
tpep_dropoff_datetime         0
passenger_count          834028
trip_distance                 0
RatecodeID               834028
store_and_fwd_flag            0
PULocationID                  0
DOLocationID                  0
payment_type             834028
fare_amount                   0
extra                         0
mta_tax                       0
tip_amount                    0
tolls_amount                  0
improvement_surcharge         0
total_amount                  0
congestion_surcharge          0
dtype: int64

In [13]:
df = df[(df['VendorID'].isin([1, 2])) & (df['tpep_pickup_datetime'] != 0) & (df['tpep_dropoff_datetime']  != 0)
       & (df['passenger_count'] > 0) & (df['trip_distance'] > 0) & (df['RatecodeID'].isin([1,2,3,4,5,6]))         
       & (df['store_and_fwd_flag'].isin(['Y', 'N'])) & (df['PULocationID'] > 0) & (df['DOLocationID'] > 0) 
       & (df['payment_type'].isin([1,2,3,4,5,6])) & (df['total_amount'] > 0)  & df['tpep_pickup_datetime'].dt.year.isin([2021])]

In [11]:
df.describe().compute()

[########################################] | 100% Completed | 34.4s


Unnamed: 0,VendorID,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
count,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0,13611780.0
mean,1.705757,1.461888,2.923087,1.030562,166.1687,163.6657,1.244544,12.09175,1.013808,0.4985114,2.20196,0.265708,0.2999857,18.03634,2.317345
std,0.4557017,1.037347,57.00789,0.2488056,65.97232,70.66129,0.4470184,107.8686,1.221645,0.02732084,2.560101,1.500124,0.002073052,108.1276,0.650595
min,1.0,1.0,0.01,1.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.01,0.0
25%,1.0,1.0,1.14,1.0,132.0,114.0,1.0,6.5,0.0,0.5,0.1,0.0,0.3,11.76,2.5
50%,2.0,1.0,1.9,1.0,163.0,162.0,1.0,9.5,0.5,0.5,2.0,0.0,0.3,14.8,2.5
75%,2.0,2.0,3.37,1.0,236.0,236.0,2.0,14.5,2.5,0.5,3.06,0.0,0.3,20.76,2.5
max,2.0,9.0,131652.1,6.0,265.0,265.0,4.0,395854.4,90.06,3.85,1140.44,956.55,0.3,395854.7,2.75


In [14]:
df = df.dropna().astype(schema)

In [13]:
len(df)

[########################################] | 100% Completed | 33.4s


13611781

In [14]:
df.head(5)

[########################################] | 100% Completed |  2.2s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
5,1,2021-01-01 00:16:29,2021-01-01 00:24:30,1,1.6,1,N,224,68,1,8.0,3.0,0.5,2.35,0.0,0.3,14.15,2.5


In [15]:
df.dtypes

VendorID                          uint8
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   uint8
trip_distance                   float64
RatecodeID                        uint8
store_and_fwd_flag               object
PULocationID                     uint16
DOLocationID                     uint16
payment_type                      uint8
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
dtype: object

In [16]:
df['RatecodeID'].unique().compute()

[########################################] | 100% Completed | 33.2s


0    1
1    2
2    4
3    5
4    3
5    6
Name: RatecodeID, dtype: uint8

In [17]:
df['passenger_count'].unique().compute()

[########################################] | 100% Completed | 33.4s


0    1
1    2
2    3
3    5
4    4
5    6
6    7
7    8
8    9
Name: passenger_count, dtype: uint8

In [18]:
df[(df.tpep_pickup_datetime.dt.month == 1) & (df.tpep_pickup_datetime.dt.year == 2021)].compute()

[########################################] | 100% Completed | 34.0s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.00,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.00,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.00,0.3,51.95,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.00,0.3,24.36,2.5
5,1,2021-01-01 00:16:29,2021-01-01 00:24:30,1,1.60,1,N,224,68,1,8.0,3.0,0.5,2.35,0.00,0.3,14.15,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
75,2,2021-01-31 23:57:54,2021-02-01 00:02:57,6,1.44,1,N,158,100,1,6.5,0.5,0.5,2.58,0.00,0.3,12.88,2.5
113,2,2021-01-31 23:53:54,2021-02-01 00:14:01,5,4.53,1,N,48,112,2,16.5,0.5,0.5,0.00,6.12,0.3,26.42,2.5
115,2,2021-01-31 23:38:51,2021-02-01 00:07:06,1,9.78,1,N,236,97,2,29.5,0.5,0.5,0.00,0.00,0.3,33.30,2.5
135,2,2021-01-31 23:59:05,2021-02-01 00:16:59,1,3.80,1,N,230,79,1,14.0,0.5,0.5,10.00,0.00,0.3,27.80,2.5


In [19]:
df.columns

Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge'],
      dtype='object')

In [20]:
df.groupby(df['tpep_pickup_datetime'])

<dask.dataframe.groupby.DataFrameGroupBy at 0x27331fb8d30>

In [21]:
df['month'] = df['tpep_pickup_datetime'].dt.strftime("%B")

In [22]:
df.groupby(df['month'])['trip_distance'].mean().compute()

[########################################] | 100% Completed |  1min 42.1s


month
December     3.284889
February     2.657960
January      2.742332
March        2.617687
April        2.838703
May          2.947852
July         3.254022
June         3.048735
August       4.481714
November     2.010000
October      2.880000
September    1.330000
Name: trip_distance, dtype: float64

In [23]:
df[(df.tpep_pickup_datetime.dt.month == 3) & (df.tpep_pickup_datetime.dt.year == 2021)]['trip_distance'].mean().compute()

[########################################] | 100% Completed |  1min 41.4s


2.6176865091817327

In [15]:
df[(df.tpep_pickup_datetime.dt.month == 12)].compute()

[########################################] | 100% Completed | 33.3s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
567150,2,2021-12-15 14:37:15,2021-12-15 14:44:55,1,0.78,1,N,107,234,1,6.5,0.0,0.5,2.45,0.0,0.3,12.25,2.5
567151,2,2021-12-15 14:47:19,2021-12-15 14:57:49,1,1.62,1,N,234,79,1,8.0,0.0,0.5,0.0,0.0,0.3,11.3,2.5
567152,2,2021-12-15 15:00:19,2021-12-15 15:09:54,1,1.77,1,N,79,233,1,9.0,0.0,0.5,3.08,0.0,0.3,15.38,2.5
567153,2,2021-12-15 15:13:23,2021-12-15 15:21:36,1,1.23,1,N,233,163,1,7.0,0.0,0.5,2.06,0.0,0.3,12.36,2.5
567154,2,2021-12-15 15:24:08,2021-12-15 23:10:05,1,7.8,2,N,163,95,1,52.0,0.0,0.5,13.82,0.0,0.3,69.12,2.5


In [48]:
rolling_df = df.compute()

[########################################] | 100% Completed |  3min 26.2s


In [52]:
df.groupby(by=[df.tpep_pickup_datetime.dt.year, df.tpep_pickup_datetime.dt.month])['trip_distance'].mean().compute()

[########################################] | 100% Completed |  1min 40.0s


tpep_pickup_datetime  tpep_pickup_datetime
2009                  1                       2.995000
2020                  12                      4.814375
2021                  1                       2.742312
                      2                       2.657960
2008                  12                      2.399583
2021                  3                       2.617687
                      4                       2.838704
                      5                       2.947853
                      6                       3.048735
                      7                       3.254022
                      8                       4.481714
                      9                       1.330000
                      10                      2.880000
                      11                      2.010000
                      12                      2.640000
2029                  5                       1.690000
2004                  4                       1.590000
Name: trip_distance, d

In [89]:
sss = rolling_df.groupby(by=[rolling_df.tpep_pickup_datetime.dt.year, rolling_df.tpep_pickup_datetime.dt.month])['trip_distance'].rolling(45).mean().to_frame(name = 'meanaa')

In [93]:
sss.droplevel(0).reset_index()['meanaa']

In [24]:
import calendar
months = list(calendar.month_name)[1:]
for month in enumerate(months):
    print(month)

(0, 'January')
(1, 'February')
(2, 'March')
(3, 'April')
(4, 'May')
(5, 'June')
(6, 'July')
(7, 'August')
(8, 'September')
(9, 'October')
(10, 'November')
(11, 'December')


In [26]:
type(df)

dask.dataframe.core.DataFrame

In [27]:
len(df.columns)

18

In [36]:
set(df.sample(frac=0.00001).tpep_pickup_datetime.dt.year.compute()) 

[########################################] | 100% Completed |  1min 20.9s


{2021}

In [40]:
sample_df = df.sample(frac=0.000001).compute()

[########################################] | 100% Completed |  1min 18.1s


In [42]:
set(sample_df.tpep_dropoff_datetime.dt.year)

{2021}

In [4]:
url = 'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-01.csv'

In [10]:
df = dd.read_csv(url, 
                 dtype={'RatecodeID': 'float64','VendorID': 'float64', 'passenger_count': 'float64', 'payment_type': 'float64'},
                 converters = {'store_and_fwd_flag': convert_dtype}, parse_dates=date_fields)

In [47]:
pip install aiohttp

Collecting aiohttp
  Downloading aiohttp-3.8.1-cp38-cp38-win_amd64.whl (555 kB)
Collecting async-timeout<5.0,>=4.0.0a3
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Collecting charset-normalizer<3.0,>=2.0
  Downloading charset_normalizer-2.0.12-py3-none-any.whl (39 kB)
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.7.2-cp38-cp38-win_amd64.whl (122 kB)
Collecting multidict<7.0,>=4.5
  Downloading multidict-6.0.2-cp38-cp38-win_amd64.whl (28 kB)
Collecting frozenlist>=1.1.1
  Downloading frozenlist-1.3.0-cp38-cp38-win_amd64.whl (33 kB)
Collecting aiosignal>=1.1.2
  Downloading aiosignal-1.2.0-py3-none-any.whl (8.2 kB)
Installing collected packages: multidict, frozenlist, yarl, charset-normalizer, async-timeout, aiosignal, aiohttp
Successfully installed aiohttp-3.8.1 aiosignal-1.2.0 async-timeout-4.0.2 charset-normalizer-2.0.12 frozenlist-1.3.0 multidict-6.0.2 yarl-1.7.2
Note: you may need to restart the kernel to use updated packages.


The system cannot find the path specified.
