In [158]:
from cuml.dask.ensemble import RandomForestClassifier
from cuml.metrics import roc_auc_score
from dask.array import from_array
from dask.distributed import Client, wait
from dask_saturn import SaturnCluster
from progress import progress
from scipy import stats
from sklearn.metrics import f1_score

import dask_cudf
import dask.dataframe as dd

In [2]:
# Parameters

numeric_feat = [
    "pickup_weekday",
    "pickup_hour",
    'work_hours',
    "pickup_minute",
    "passenger_count",
    'trip_distance',
    'trip_time',
    'trip_speed'
]
categorical_feat = [
    "PULocationID",
    "DOLocationID",
    "RatecodeID",
]
features = numeric_feat + categorical_feat

EPS = 1e-7

## Initialize cluster

In [3]:
from progress import progress
progress('rf-rapids-dask-cluster-setup')

from dask.distributed import Client, wait
from dask_saturn import SaturnCluster
n_workers = 3
cluster = SaturnCluster(
    n_workers=n_workers, scheduler_size="medium", worker_size="g4dnxlarge"
)
client = Client(cluster)
client

[2021-02-02 00:08:46] INFO - dask-saturn | Cluster is ready
[2021-02-02 00:08:46] INFO - dask-saturn | Registering default plugins
[2021-02-02 00:08:46] INFO - dask-saturn | {'tcp://10.0.0.100:35425': {'status': 'repeat'}, 'tcp://10.0.18.10:34943': {'status': 'repeat'}, 'tcp://10.0.27.175:41401': {'status': 'repeat'}}


0,1
Client  Scheduler: tcp://d-shrey-rapids-random-forest-85ae247e8459473fbfdc641eb0e7ecb2.main-namespace:8786  Dashboard: https://d-shrey-rapids-random-forest-85ae247e8459473fbfdc641eb0e7ecb2.community.saturnenterprise.io,Cluster  Workers: 3  Cores: 12  Memory: 46.50 GB


## Create helper functions

In [102]:
def preprocess(df: dask_cudf.DataFrame, target_col: str, start_date: str = None, end_date: str = None) -> dask_cudf.DataFrame:
    """
        * computes the target ('high_tip')
        * adds features
        * removes unused features

    Casts all numeric values to 32-bit types, for efficiency and
    because some older versions of CUDA / ``cudf``, ``cuml``
    did not support 64-bit types in training data.
    """
    # Clean
    df = df[df.fare_amount > 0]  # avoid divide-by-zero
    if start_date:
        df = df[df.tpep_dropoff_datetime.astype('str') >= start_date]
    if end_date:
        df = df[df.tpep_dropoff_datetime.astype('str') <= end_date]

    # add target
    df["tip_fraction"] = df.tip_amount / df.fare_amount
    df[target_col] = df["tip_fraction"] > 0.2

    # add features
    df["pickup_weekday"] = df.tpep_pickup_datetime.dt.weekday
    df["pickup_hour"] = df.tpep_pickup_datetime.dt.hour
    df["pickup_minute"] = df.tpep_pickup_datetime.dt.minute
    df["work_hours"] = (df.pickup_weekday >= 0) & (df.pickup_weekday <= 4) & (df.pickup_hour >= 8) & (df.pickup_hour <= 18)
    df['trip_time'] = (df.tpep_dropoff_datetime - df.tpep_pickup_datetime).dt.seconds
    df['trip_speed'] = df.trip_distance / (df.trip_time + EPS)

    # drop unused columns
    df = df[['tpep_dropoff_datetime'] + features + [target_col]]
    df[features + [target_col]] = df[features + [target_col]].astype("float32").fillna(-1.0)

    # convert target to int32 for efficiency (it's just 0s and 1s)
    df[target_col] = df[target_col].astype("int32")

    return df.reset_index(drop=True)

def f1_streaming(df: dask_cudf.DataFrame, target_col: str, pred_col: str) -> dask_cudf.Series:
    """
    Computes rolling precision and recall columns
    F1 = 2 * (precision * recall) / (precision + recall)

    Precision: of the rows we predicted true, how many were true?
    Recall: of all the trues, how many did we predict to be true?
    
    Args:
        df: dask dataframe
        target_col: column name of the target (must be in df)
        pred_col: column name of the prediction (must be in df)
    
    Returns:
        dask_cudf: Series representing the cumulative F1 score
    """
    df = df.sort_values(by=['tpep_dropoff_datetime'], ascending=True)
    numerator = (df['prediction'] & df[target_col]).cumsum()
    precision_denominator = df['prediction'].cumsum()
    recall_denominator = df[target_col].cumsum()
    precision = numerator / precision_denominator
    recall = numerator / recall_denominator
    return 2 * (precision * recall) / (precision + recall)

def get_daily_f1_score(partition):
    numerator = (partition[target_col] & partition['prediction']).sum()
    recall_denominator = partition[target_col].sum()
    precision_denominator = partition['prediction'].sum()
    precision = numerator / precision_denominator
    recall = numerator / recall_denominator
    f1_score = 2 * (precision * recall) / (precision + recall)
    partition['daily_f1'] = f1_score
    return partition.sort_values(by='tpep_dropoff_datetime', ascending=False).head(1)[['day', 'rolling_f1', 'daily_f1']]

## Load train data

In [5]:
taxi = dask_cudf.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2020-01.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    assume_missing=True,
)

print(f"Num rows: {len(taxi)}, Size: {taxi.memory_usage(deep=True).sum().compute() / 1e9} GB")
taxi.head()

Num rows: 6405008, Size: 0.903424059 GB


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,1.2,1.0,N,238.0,239.0,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,1.2,1.0,N,239.0,238.0,1.0,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,0.6,1.0,N,238.0,238.0,1.0,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,0.8,1.0,N,238.0,151.0,1.0,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,0.0,1.0,N,193.0,193.0,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0


In [6]:
target_col = "high_tip"

taxi_train = preprocess(df=taxi, target_col=target_col)
print(f"Num rows: {len(taxi_train)}, Size: {taxi_train.memory_usage(deep=True).sum().compute() / 1e9} GB")

Num rows: 6382762, Size: 0.357434672 GB


## Train model on January 2020

In [7]:
%%time
progress('start-rf-rapids-dask-fit')

rfc = RandomForestClassifier(n_estimators=100, max_depth=10, ignore_empty_partitions=True)

rfc.fit(taxi_train[features], taxi_train[target_col])
progress('finished-rf-rapids-dask-fit')

CPU times: user 252 ms, sys: 5.38 ms, total: 258 ms
Wall time: 23.5 s


In [8]:
# Metrics on train set

preds = rfc.predict_proba(taxi_train[features])[1]
# print(f'Accuracy: {rfc.score(taxi_train[features].compute(), taxi_train[target_col].compute())}')
print(f'F1: {f1_score(taxi_train[target_col].compute().to_array(), preds.round().compute().to_array())}')

F1: 0.6681650475249482


## Evaluate on February 2020

In [152]:
taxi_feb = dask_cudf.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2020-02.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    assume_missing=True,
)

taxi_test = preprocess(taxi_feb, target_col=target_col)

In [153]:
# Metric on test set

preds = rfc.predict_proba(taxi_test[features])[1]
print(f'F1: {f1_score(taxi_test[target_col].compute().to_array(), preds.round().compute().to_array())}')

F1: 0.6658098920024954


## Simulate "live" inference on March

As every new batch of points comes in, we make a prediction. We compute the rolling and daily F1 scores.

In [74]:
# First, load and sort the march dataframe

taxi_march = dask_cudf.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2020-03.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    assume_missing=True,
)

taxi_inference = preprocess(taxi_march, target_col=target_col, start_date='2020-03-01', end_date='2020-03-31').sort_values(by=['tpep_dropoff_datetime'], ascending=True).reset_index(drop=True)
taxi_inference['day'] = taxi_inference.tpep_dropoff_datetime.dt.day.to_dask_array()

In [107]:
# Save predictions as a new column, compute rolling F1 score

taxi_inference['predicted_prob'] = rfc.predict_proba(taxi_inference[features])[1]
taxi_inference['prediction'] = taxi_inference['predicted_prob'].round().astype('int32')
taxi_inference['rolling_f1'] = f1_streaming(taxi_inference, target_col, 'prediction')
daily_f1 = taxi_inference.groupby('day').apply(get_daily_f1_score, meta={'day': int, 'rolling_f1': float, 'daily_f1': float})

In [116]:
daily_f1.sort_values(by='day').compute().to_pandas().to_latex()

'\\begin{tabular}{lrrr}\n\\toprule\n{} &  day &  rolling\\_f1 &  daily\\_f1 \\\\\n\\midrule\n178123  &    1 &    0.576629 &  0.576629 \\\\\n370840  &    2 &    0.633320 &  0.677398 \\\\\n592741  &    3 &    0.649983 &  0.675877 \\\\\n821398  &    4 &    0.659940 &  0.684125 \\\\\n1064741 &    5 &    0.675841 &  0.722298 \\\\\n1307013 &    6 &    0.682284 &  0.708181 \\\\\n58517   &    7 &    0.668002 &  0.555498 \\\\\n225439  &    8 &    0.659918 &  0.572543 \\\\\n400352  &    9 &    0.660947 &  0.670717 \\\\\n583448  &   10 &    0.661801 &  0.670428 \\\\\n765578  &   11 &    0.663678 &  0.684011 \\\\\n936075  &   12 &    0.667420 &  0.711109 \\\\\n1070221 &   13 &    0.668812 &  0.691889 \\\\\n1159620 &   14 &    0.666032 &  0.571661 \\\\\n1219523 &   15 &    0.664177 &  0.564885 \\\\\n1283501 &   16 &    0.663604 &  0.638491 \\\\\n1328995 &   17 &    0.663178 &  0.635958 \\\\\n1365063 &   18 &    0.662761 &  0.628822 \\\\\n1394730 &   19 &    0.662613 &  0.648809 \\\\\n1422146 &   20

## Evaluate model on later months

In [110]:
# Cycle through many test sets

months = ['2020-03', '2020-04', '2020-05', '2020-06']
month_dfs = {}

for month in months:
    
    if month not in month_dfs:
        df = dask_cudf.read_csv(
            f"s3://nyc-tlc/trip data/yellow_tripdata_{month}.csv",
            parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
            storage_options={"anon": True},
            assume_missing=True,
        )

        df = preprocess(df, target_col=target_col)
        month_dfs[month] = df.copy()
    
    taxi_test = month_dfs[month]
        
    preds = rfc.predict_proba(taxi_test[features])[1]
    print(month)
#     print(f'\tAccuracy: {rfc.score(taxi_test[features].compute(), taxi_test[target_col].compute())}')
    print(f'\tF1: {f1_score(taxi_test[target_col].compute().to_array(), preds.round().compute().to_array())}')

2020-03
	F1: 0.6592796100378214
2020-04
	F1: 0.5714705472990737
2020-05
	F1: 0.5530868473460906
2020-06
	F1: 0.5967621469282887


## Inspect differences between feature values

In [160]:
statistics = []
p_values = []

for feature in features:
    statistic, p_value = stats.ks_2samp(taxi_train[feature].compute().to_pandas(), taxi_test[feature].compute().to_pandas())
    statistics.append(statistic)
    p_values.append(p_value)

pickup_weekday
pickup_hour
work_hours
pickup_minute
passenger_count
trip_distance
trip_time
trip_speed
PULocationID
DOLocationID
RatecodeID


In [167]:
comparison_df = pd.DataFrame(data={'feature': features, 'statistic': statistics, 'p_value': p_values})
comparison_df.sort_values(by='p_value', ascending=True).head(11)

Unnamed: 0,feature,statistic,p_value
0,pickup_weekday,0.046196,0.0
2,work_hours,0.028587,0.0
6,trip_time,0.017205,0.0
7,trip_speed,0.035415,0.0
1,pickup_hour,0.009676,8.610133e-258
5,trip_distance,0.005312,5.266602e-78
8,PULocationID,0.004083,2.994877e-46
9,DOLocationID,0.003132,2.157559e-27
4,passenger_count,0.002947,2.634493e-24
10,RatecodeID,0.002616,3.0474809999999995e-19
