# Random forest classification

## Dask + RAPIDS GPU cluster with Snowflake

<table>
    <tr>
        <td>
            <img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" width="300">
        </td>
        <td>
            <img src="https://rapids.ai/assets/images/RAPIDS-logo-purple.svg" width="300">
        </td>
        <td>
            <img src="https://upload.wikimedia.org/wikipedia/commons/thumb/f/ff/Snowflake_Logo.svg/1280px-Snowflake_Logo.svg.png" width="300">
        </td>
    </tr>
</table>

In [1]:
import os

MODEL_PATH = 'models'
if not os.path.exists(MODEL_PATH):
    os.makedirs(MODEL_PATH)
    
numeric_feat = [
    'pickup_weekday', 
    'pickup_weekofyear', 
    'pickup_hour', 
    'pickup_week_hour', 
    'pickup_minute', 
    'passenger_count',
]
categorical_feat = [
    "pickup_longitude", 
    "pickup_latitude", 
    "dropoff_longitude", 
    "dropoff_latitude",
    #'pickup_taxizone_id', 
    #'dropoff_taxizone_id',
]
features = numeric_feat + categorical_feat
y_col = 'high_tip'

# Initialize Dask GPU cluster

In [2]:
import os
import time
import datetime
import warnings
import pandas as pd

import dask.dataframe as dd
from dask.distributed import Client, progress, wait
from dask import persist, delayed

#import cudf
#import dask_cudf as cudd

warnings.simplefilter("ignore")

In [3]:
n_workers = 2
cluster = ""
client = Client()
client

0,1
Connection method: Direct,
Dashboard: http://scheduler:8787/status,

0,1
Comm: tcp://172.18.0.2:8786,Workers: 4
Dashboard: http://172.18.0.2:8787/status,Total threads: 32
Started: Just now,Total memory: 251.25 GiB

0,1
Comm: tcp://172.18.0.3:45173,Total threads: 8
Dashboard: http://172.18.0.3:46389/status,Memory: 62.81 GiB
Nanny: tcp://172.18.0.3:38505,
Local directory: /tmp/dask-worker-space/worker-60yci29i,Local directory: /tmp/dask-worker-space/worker-60yci29i
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 129.18 MiB,Spilled bytes: 0 B
Read bytes: 285.37547290008723 B,Write bytes: 1.47 kiB

0,1
Comm: tcp://172.18.0.4:34061,Total threads: 8
Dashboard: http://172.18.0.4:40271/status,Memory: 62.81 GiB
Nanny: tcp://172.18.0.4:43289,
Local directory: /tmp/dask-worker-space/worker-vk2qlnzu,Local directory: /tmp/dask-worker-space/worker-vk2qlnzu
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 129.25 MiB,Spilled bytes: 0 B
Read bytes: 286.2327478690673 B,Write bytes: 1.47 kiB

0,1
Comm: tcp://172.18.0.5:43605,Total threads: 8
Dashboard: http://172.18.0.5:34685/status,Memory: 62.81 GiB
Nanny: tcp://172.18.0.5:41387,
Local directory: /tmp/dask-worker-space/worker-es_rzz5f,Local directory: /tmp/dask-worker-space/worker-es_rzz5f
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 130.50 MiB,Spilled bytes: 0 B
Read bytes: 285.7732629208283 B,Write bytes: 1.47 kiB

0,1
Comm: tcp://172.18.0.6:35193,Total threads: 8
Dashboard: http://172.18.0.6:33045/status,Memory: 62.81 GiB
Nanny: tcp://172.18.0.6:39109,
Local directory: /tmp/dask-worker-space/worker-f7vxytkb,Local directory: /tmp/dask-worker-space/worker-f7vxytkb
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 130.61 MiB,Spilled bytes: 0 B
Read bytes: 286.1159032634831 B,Write bytes: 1.47 kiB


Open the dashboard (link ^) and watch it when you execute some commands, you'll see which tasks are running across the cluster. There are a couple other dashboard pages worth viewing for GPU memory and utilization that are not listed on the navbar, so we grab direct links for those below.

In [4]:
from IPython.display import display, HTML

gpu_links = f'''
<b>GPU Dashboard links</b>
<ul>
<li><a href="{client.dashboard_link}/individual-gpu-memory" target="_blank">GPU memory</a></li>
<li><a href="{client.dashboard_link}/individual-gpu-utilization" target="_blank">GPU utilization</a></li>
</ul>
'''
display(HTML(gpu_links))

If you created your cluster here in this notebook, it might take a few minutes for all your nodes to become available. You can run the chunk below to block until all nodes are ready.

>**Pro tip**: Create and/or start your cluster from the "Dask" page in Saturn if you want to get a head start!

In [5]:
client.wait_for_workers(n_workers=n_workers)

# Load data and feature engineering

Load a full month for this exercise. Note we are loading the data with Dask+RAPIDS now (`dask_cudf.read_csv` vs. `pd.read_csv`)

In [6]:

nyc_datatype = {'VendorID': 'string',
                'passenger_count': 'int32',
                'trip_distance': 'float32',
                'pickup_longitude': 'float32',
                'pickup_latitude': 'float32',
                'RateCodeID': 'string',
                'store_and_fwd_flag': 'string',
                'dropoff_longitude': 'float32',
                'dropoff_latitude': 'float32',
                'payment_type': 'string',
                'fare_amount': 'float32',
                'extra': 'float32',
                'mta_tax': 'float32',
                'tip_amount': 'float32',
                'tolls_amount': 'float32',
                'improvement_surcharge': 'float32',
                'total_amount':'float32' }

In [7]:
df = dd.read_parquet("/home/cloud/dataset/nyc-taxi/yellow_tripdata_2015.parquet")

df = df.astype(nyc_datatype)
df = df.persist()
progress(df)

VBox()

In [8]:
%time wait(df)
%time print(df.passenger_count.sum().compute())

CPU times: user 7.96 ms, sys: 0 ns, total: 7.96 ms
Wall time: 7.52 ms
245566747
CPU times: user 23.5 ms, sys: 9.5 ms, total: 33 ms
Wall time: 309 ms


In [9]:
df

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
npartitions=353,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,string,datetime64[ns],datetime64[ns],int32,float32,float32,float32,string,string,float32,float32,string,float32,float32,float32,float32,float32,float32,float32
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [18]:
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
        'passenger_count', 'trip_distance', 'RateCodeID', 'store_and_fwd_flag',
        'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'extra',
        'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
        'total_amount']

taxi = df[cols]

taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday
taxi['pickup_weekofyear'] = taxi.tpep_pickup_datetime.dt.week
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)
#taxi = taxi.fillna(-1)

X = taxi[features].astype('float32')
y = taxi['total_amount']

CPU times: user 317 ms, sys: 23.9 ms, total: 341 ms
Wall time: 38.1 s
CPU times: user 28.4 ms, sys: 0 ns, total: 28.4 ms
Wall time: 200 ms


146112989

CPU times: user 393 ms, sys: 48.9 ms, total: 442 ms
Wall time: 52.1 s
CPU times: user 121 ms, sys: 9.24 ms, total: 130 ms
Wall time: 25.1 s


146112989

pickup_weekday       float32
pickup_weekofyear    float32
pickup_hour          float32
pickup_week_hour     float32
pickup_minute        float32
passenger_count      float32
pickup_longitude     float32
pickup_latitude      float32
dropoff_longitude    float32
dropoff_latitude     float32
dtype: object

Dask performs computations in a [lazy manner](https://tutorial.dask.org/01x_lazy.html), so we persist the dataframe to perform data loading and feature processing and load into GPU memory.

In [None]:
X = X.fillna(-1)
y = y.fillna(-1)

X, y = persist(X, y)

%time _ = wait([X, y])
%time len(X)

In [None]:
X.dtypes


In [24]:
taxi_train = X
print(f'Num rows: {len(taxi_train)}, Size: {taxi_train.memory_usage(deep=True).compute().sum() / 1e6} MB')
taxi_train.groupby('pickup_weekday')['pickup_weekday'].count().compute()

Num rows: 146112989, Size: 7013.423472 MB


pickup_weekday
0.0    18667030
1.0    20129034
2.0    21076056
3.0    21865569
4.0    21972102
5.0    22608347
6.0    19794851
Name: pickup_weekday, dtype: int64

In [23]:
from cuml.dask.ensemble import RandomForestRegressor

rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)
_ = rf.fit(X, y)

ModuleNotFoundError: No module named 'cuml'

In [22]:
#taxi_train = taxi[features + [y_col]]
#taxi_train[features] = taxi_train[features].astype("float32").fillna(-1)
#taxi_train[y_col] = taxi_train[y_col].astype("int32").fillna(-1)

In [19]:
taxi_train = taxi_train.persist()
_ = wait(taxi_train)

In [20]:
print(f'Num rows: {len(taxi_train)}, Size: {taxi_train.memory_usage(deep=True).compute().sum() / 1e6} MB')

Num rows: 300698204, Size: 10825.135344 MB


In [21]:
taxi_train.groupby('high_tip')['high_tip'].count().compute()

high_tip
1    151325359
0    149372845
Name: high_tip, dtype: int64

In [22]:
taxi_train.head()

Unnamed: 0,pickup_weekday,pickup_weekofyear,pickup_hour,pickup_week_hour,pickup_minute,passenger_count,pickup_taxizone_id,dropoff_taxizone_id,high_tip
0,5.0,42.0,0.0,120.0,32.0,1.0,113.0,230.0,0
1,5.0,42.0,9.0,129.0,33.0,2.0,238.0,239.0,0
2,5.0,42.0,9.0,129.0,45.0,2.0,239.0,163.0,0
3,5.0,42.0,7.0,127.0,48.0,1.0,158.0,231.0,1
4,5.0,42.0,8.0,128.0,7.0,1.0,209.0,232.0,1


# Train model

In [23]:
from cuml.dask.ensemble import RandomForestClassifier
rfc = RandomForestClassifier(n_estimators=100, max_depth=10, seed=42)

In [24]:
%%time
_ = rfc.fit(taxi_train[features], taxi_train[y_col])

CPU times: user 1.72 s, sys: 253 ms, total: 1.97 s
Wall time: 7.33 s


## Calculate metrics on test set

Use a different month for test set

In [29]:
test_dates = get_dates('2020-01-01', '2020-03-01')
taxi_test = cudd.from_delayed([load(conn_info, query, day) for day in test_dates])

In [30]:
taxi_test = taxi_test[features + [y_col]]
taxi_test[features] = taxi_test[features].astype("float32").fillna(-1)
taxi_test[y_col] = taxi_test[y_col].astype("int32").fillna(-1)

In [31]:
taxi_test = taxi_test.persist()
_ = wait(taxi_test)

<br>

Convert to single-GPU DataFrame using `compute()` because the Dask+RAPIDS implementation doesnt yet have `roc_auc_score`

In [32]:
from cuml.metrics import roc_auc_score

preds = rfc.predict_proba(taxi_test[features])[1]
roc_auc_score(taxi_test[y_col].compute(), preds.compute())

0.5315331220626831