In [1]:
from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask_cudf
import cudf
import numpy as np

from cuml.dask.ensemble import RandomForestRegressor as cumlDaskRF

In [2]:
cluster = LocalCUDACluster(threads_per_worker=1)
c = Client(cluster)

workers = c.has_what().keys()
n_workers = len(workers)

In [3]:
base_path = '/data/nyc_taxi/'

df_2014 = dask_cudf.read_csv(base_path+'2014/yellow_tripdata_2014-01.csv')

remap = {}
remap['tpep_pickup_datetime'] = 'pickup_datetime'
remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
remap['ratecodeid'] = 'rate_code'

#create a list of columns & dtypes the df must have
must_haves = {
 'pickup_datetime': 'datetime64[ms]',
 'dropoff_datetime': 'datetime64[ms]',
 'passenger_count': 'int32',
 'trip_distance': 'float32',
 'pickup_longitude': 'float32',
 'pickup_latitude': 'float32',
 'rate_code': 'int32',
 'dropoff_longitude': 'float32',
 'dropoff_latitude': 'float32',
 'fare_amount': 'float32'
}

def clean(df_part, remap, must_haves):    
    # some col-names include pre-pended spaces remove & lowercase column names
    tmp = {col:col.strip().lower() for col in list(df_part.columns)}
    df_part = df_part.rename(tmp)
    
    # rename using the supplied mapping
    df_part = df_part.rename(remap)
    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col)
            continue
        
        
        if df_part[col].dtype == 'object' and col in ['pickup_datetime', 'dropoff_datetime']:
            df_part[col] = df_part[col].astype('datetime64[ms]')
            continue 

        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].str.fillna('-1')
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
    
    return df_part

taxi_df = df_2014.map_partitions(clean, remap, must_haves)

query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
taxi_df = taxi_df.query(' and '.join(query_frags))

import math
from math import cos, sin, asin, sqrt, pi

def haversine_distance_kernel(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, h_distance):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        h_distance[i] = c * r

def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] <3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m*2.6) + y + (y//4) + (c//4) -2*c)%7
        
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    df['diff'] = df['dropoff_datetime'].astype('int32') - df['pickup_datetime'].astype('int32')
    
    df['pickup_latitude_r'] = df['pickup_latitude']//.01*.01
    df['pickup_longitude_r'] = df['pickup_longitude']//.01*.01
    df['dropoff_latitude_r'] = df['dropoff_latitude']//.01*.01
    df['dropoff_longitude_r'] = df['dropoff_longitude']//.01*.01
    
    df = df.drop('pickup_datetime')
    df = df.drop('dropoff_datetime')
    
    
    df = df.apply_rows(haversine_distance_kernel,
                   incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
                   outcols=dict(h_distance=np.float32),
                   kwargs=dict())
    
    
    df = df.apply_rows(day_of_the_week_kernel,
                      incols=['day', 'month', 'year'],
                      outcols=dict(day_of_week=np.float32),
                      kwargs=dict())
    
    
    df['is_weekend'] = (df['day_of_week']<2)
    return df

taxi_df = taxi_df.map_partitions(add_features)

taxi_df.head().to_pandas()

Unnamed: 0,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount,hour,year,month,day,diff,pickup_latitude_r,pickup_longitude_r,dropoff_latitude_r,dropoff_longitude_r,h_distance,day_of_week,is_weekend
0,1,0.7,-73.994766,40.736828,1,-73.982224,40.731789,6.5,20,2014,1,9,426000,40.73,-74.0,40.73,-73.989998,1.196175,4.0,False
1,1,1.4,-73.982391,40.77338,1,-73.960449,40.763996,8.5,20,2014,1,9,540000,40.77,-73.989998,40.759998,-73.970001,2.122098,4.0,False
2,2,2.3,-73.988571,40.739407,1,-73.986626,40.765217,11.5,20,2014,1,9,899000,40.73,-73.989998,40.759998,-73.989998,2.874643,4.0,False
3,1,1.7,-73.960213,40.770466,1,-73.979866,40.77705,7.5,20,2014,1,9,403000,40.77,-73.970001,40.77,-73.979996,1.809662,4.0,False
4,1,0.9,-73.995369,40.717247,1,-73.984367,40.720524,6.0,20,2014,1,9,383000,40.709999,-74.0,40.719997,-73.989998,0.996204,4.0,False


In [4]:
taxi_df.dtypes

passenger_count          int32
trip_distance          float32
pickup_longitude       float32
pickup_latitude        float32
rate_code                int32
dropoff_longitude      float32
dropoff_latitude       float32
fare_amount            float32
hour                     int16
year                     int16
month                    int16
day                      int16
diff                     int32
pickup_latitude_r      float32
pickup_longitude_r     float32
dropoff_latitude_r     float32
dropoff_longitude_r    float32
h_distance             float32
day_of_week            float32
is_weekend                bool
dtype: object

In [5]:
n_partitions = n_workers

X_train = taxi_df.query('day < 25').persist()

# create a Y_train ddf with just the target variable
y_train = X_train['fare_amount'].persist()
# drop the target variable from the training ddf
X_train = X_train[X_train.columns.difference(['fare_amount'])]
# Persist to cache the data in active memory

X_train_dask, y_train_dask = \
  dask_utils.persist_across_workers(c, [X_train, y_train], workers=workers)

In [6]:
len(X_train_dask)

9847387

In [7]:
cuml_model = cumlDaskRF(max_depth=25, n_estimators=1000)
cuml_model.fit(X_train_dask, y_train_dask)

wait(cuml_model.rfs)

DoneAndNotDoneFutures(done={<Future: status: finished, type: RandomForestRegressor, key: _func_build_rf-9db3fe23cf236af9665d5159aa684aea>, <Future: status: finished, type: RandomForestRegressor, key: _func_build_rf-86bd19b9fa9eba9b74eae7081a49941c>, <Future: status: finished, type: RandomForestRegressor, key: _func_build_rf-055d0d4b6a9bc67a463683e396f7caad>, <Future: status: finished, type: RandomForestRegressor, key: _func_build_rf-ad46f63d28b469619b8c73d2d4076519>}, not_done=set())

In [8]:
def drop_empty_partitions(df):
    lengths = df.map_partitions(len).compute()
    nonempty = [length > 0 for length in lengths]
    return df.partitions[nonempty]

X_test = taxi_df.query('day >= 25').persist()

X_test = drop_empty_partitions(X_test)

Y_test = X_test[['fare_amount']]

# Drop the fare amount from X_test
X_test = X_test[X_test.columns.difference(['fare_amount'])]

# display test set size
len(X_test)

3191940

In [9]:
x_test_pandas = X_test.compute()

In [10]:
x_test_pandas = x_test_pandas.to_pandas().to_numpy()

In [11]:
len(X_train_dask.columns)

19

In [12]:
x_test_pandas.shape

(3191940, 19)

In [13]:
cuml_y_pred = cuml_model.predict(x_test_pandas)

ValueError: The number of columns/features in the training and test data should be the same 

In [14]:
x_test_pandas

array([[25, 6.0, 966000, ..., 1, 2.799999952316284, 2014],
       [25, 6.0, 527000, ..., 1, 1.2999999523162842, 2014],
       [25, 6.0, 546000, ..., 1, 1.600000023841858, 2014],
       ...,
       [27, 1.0, 283000, ..., 1, 0.8999999761581421, 2014],
       [27, 1.0, 527000, ..., 1, 1.0, 2014],
       [27, 1.0, 788000, ..., 1, 3.0, 2014]], dtype=object)