In [22]:
import numpy as np
import pandas as pd
import xgboost as xgb
import numba, socket
import cudf
import dask, dask_cudf
from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
#import dask.dataframe as dd
from dask.distributed import Client, wait, LocalCluster
from dask_ml.model_selection import train_test_split
from dask.utils import parse_bytes
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

cluster = LocalCUDACluster(
    rmm_pool_size=parse_bytes("48GB") # Each GPU has 16GB of memory
)
client = Client(cluster)
client.restart()

dask.config.set({'distributed.scheduler.work-stealing': False})

Perhaps you already have a cluster running?
Hosting the HTTP server on port 46071 instead
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.preloading - INFO - Import preload module: dask_cuda.initialize


<dask.config.set at 0x7f39246b3310>

In [5]:
df = dask_cudf.read_csv('../data/train.csv')

In [3]:
df['key'] = df['key'].astype('datetime64[ns]')
df['fare_amount'] = df ['fare_amount'].astype('float32')
df['pickup_datetime'] = df['pickup_datetime'].astype('datetime64[ns]')
df['pickup_longitude'] = df ['pickup_longitude'].astype('float32')
df['pickup_latitude'] = df ['pickup_latitude'].astype('float32')
df['dropoff_longitude'] = df ['dropoff_longitude'].astype('float32')
df['dropoff_latitude'] = df ['dropoff_latitude'].astype('float32')
df['passenger_count'] = df ['passenger_count'].astype('uint8')

In [4]:
# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount >= 2.5 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
df = df.query(' and '.join(query_frags))

In [2]:
import math
from math import cos, sin, asin, sqrt, pi
        
def jfk_distance(dropoff_latitude, dropoff_longitude, jfk_distance):
    for i, (x_1, y_1) in enumerate(zip(dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_jfk = pi/180 * 40.6413
        y_jfk = pi/180 * -73.7781
        
        dlon = y_jfk - y_1
        dlat = x_jfk - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_jfk) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        jfk_distance[i] = c * r
        
def lga_distance(dropoff_latitude, dropoff_longitude, lga_distance):
    for i, (x_1, y_1) in enumerate(zip(dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_lga = pi/180 * 40.7769
        y_lga = pi/180 * -73.8740
        
        dlon = y_lga - y_1
        dlat = x_lga - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_lga) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        lga_distance[i] = c * r
        
def ewr_distance(dropoff_latitude, dropoff_longitude, ewr_distance):
    for i, (x_1, y_1) in enumerate(zip(dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_ewr = pi/180 * 40.6895
        y_ewr = pi/180 * -74.1745
        
        dlon = y_ewr - y_1
        dlat = x_ewr - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_ewr) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        ewr_distance[i] = c * r
        
def tsq_distance(dropoff_latitude, dropoff_longitude, tsq_distance):
    for i, (x_1, y_1) in enumerate(zip(dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_tsq = pi/180 * 40.7580
        y_tsq = pi/180 * -73.9855
        
        dlon = y_tsq - y_1
        dlat = x_tsq - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_tsq) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        tsq_distance[i] = c * r
        
def met_distance(dropoff_latitude, dropoff_longitude, met_distance):
    for i, (x_1, y_1) in enumerate(zip(dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_met = pi/180 * 40.7794
        y_met = pi/180 * -73.9632
        
        dlon = y_met - y_1
        dlat = x_met - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_met) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        met_distance[i] = c * r
        
def wtc_distance(dropoff_latitude, dropoff_longitude, wtc_distance):
    for i, (x_1, y_1) in enumerate(zip(dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_wtc = pi/180 * 40.7126
        y_wtc = pi/180 * -74.0099
        
        dlon = y_wtc - y_1
        dlat = x_wtc - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_wtc) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        wtc_distance[i] = c * r
        
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    df['weekday'] = df['pickup_datetime'].dt.weekday
    
    df = df.apply_rows(jfk_distance, incols=['dropoff_latitude', 'dropoff_longitude'],
                       outcols=dict(jfk_distance=np.float32), kwargs=dict())
    
    df = df.apply_rows(lga_distance, incols=['dropoff_latitude', 'dropoff_longitude'],
                       outcols=dict(lga_distance=np.float32), kwargs=dict())
        
    df = df.apply_rows(ewr_distance, incols=['dropoff_latitude', 'dropoff_longitude'],
                       outcols=dict(ewr_distance=np.float32), kwargs=dict())
            
    df = df.apply_rows(tsq_distance, incols=['dropoff_latitude', 'dropoff_longitude'],
                       outcols=dict(tsq_distance=np.float32), kwargs=dict())
    
    df = df.apply_rows(met_distance, incols=['dropoff_latitude', 'dropoff_longitude'],
                       outcols=dict(met_distance=np.float32), kwargs=dict())
    
    df = df.apply_rows(wtc_distance, incols=['dropoff_latitude', 'dropoff_longitude'],
                       outcols=dict(wtc_distance=np.float32), kwargs=dict())
    
    df = df.drop(['pickup_datetime','key'], axis=1)
    
    return df

In [6]:
# now add the features
parts = [dask.delayed(add_features)(part) for part in df.to_delayed()]
df = dask_cudf.from_delayed(parts)

In [6]:
y = df['fare_amount']
X = df.drop(['fare_amount'], axis=1)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=17)



In [10]:
X_train.compute().shape

MemoryError: std::bad_alloc: CUDA error at: /opt/conda/envs/rapids/conda-bld/xgboost_1633530136205/_build_env/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory

In [8]:
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dvalid = xgb.dask.DaskDMatrix(client, X_test, y_test)
watchlist = [(dtrain, 'train'), (dvalid, 'valid')]

"""params = {
    'learning_rate': 0.1,
    'max_depth': 11,
    'objective': 'reg:squarederror',
    'subsample': 0.7,
    'colsample_bytree': 0.7,
    'min_child_weight': 1,
    'booster' : 'gbtree',
    'eval_metric': ["auc", "rmse"],
    'tree_method':'gpu_hist',
}"""

params = {
    'booster' : 'gbtree',
    'eval_metric': "rmse",
    'tree_method':'gpu_hist',
    'objective': 'reg:squarederror',
    'min_child_weight': 1,
    'colsample_bytree': 0.7,
    'learning_rate': 0.05,
}

trained_model = xgb.dask.train(client, params, dtrain, num_boost_round=2000, evals=watchlist, early_stopping_rounds=100, verbose_eval=100)

[07:37:02] task [xgboost.dask]:tcp://10.1.0.7:37991 got new rank 0
[07:37:02] task [xgboost.dask]:tcp://10.1.0.7:42403 got new rank 1
[07:37:02] task [xgboost.dask]:tcp://10.1.0.7:41469 got new rank 2
[07:37:02] task [xgboost.dask]:tcp://10.1.0.7:34739 got new rank 3


[0]	train-rmse:13.89133	valid-rmse:13.89348
[100]	train-rmse:4.39542	valid-rmse:4.40677
[200]	train-rmse:4.07346	valid-rmse:4.08719
[300]	train-rmse:3.96014	valid-rmse:3.97544
[400]	train-rmse:3.87219	valid-rmse:3.88953
[500]	train-rmse:3.83406	valid-rmse:3.85290
[600]	train-rmse:3.80271	valid-rmse:3.82383
[700]	train-rmse:3.77443	valid-rmse:3.79734
[800]	train-rmse:3.75229	valid-rmse:3.77688
[900]	train-rmse:3.73625	valid-rmse:3.76208
[1000]	train-rmse:3.72282	valid-rmse:3.75018
[1100]	train-rmse:3.71178	valid-rmse:3.74076
[1200]	train-rmse:3.70009	valid-rmse:3.73049
[1300]	train-rmse:3.68963	valid-rmse:3.72191
[1400]	train-rmse:3.67977	valid-rmse:3.71356
[1500]	train-rmse:3.67164	valid-rmse:3.70721
[1600]	train-rmse:3.66494	valid-rmse:3.70172
[1700]	train-rmse:3.65871	valid-rmse:3.69691
[1800]	train-rmse:3.65123	valid-rmse:3.69079
[1900]	train-rmse:3.64559	valid-rmse:3.68653
[1999]	train-rmse:3.63957	valid-rmse:3.68186


In [9]:
trained_model['booster'].save_model("xg_gpu.model")

In [10]:
type(trained_model['booster'])

xgboost.core.Booster

In [3]:
model = xgb.Booster()
model.load_model("/home/nvidiatest/mlops_blog/saved_models/xgboost.model")
model

<xgboost.core.Booster at 0x7f3a3bfc5e50>

In [4]:
# prepare test set
test = dask_cudf.read_csv('/home/nvidiatest/mlops_blog/data/test.csv')
test.head()

Unnamed: 0,key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,2014-07-19 01:36:28.0000002,18.5,2014-07-19 01:36:28 UTC,-74.008837,40.741626,-74.008837,40.741626,4
1,2014-12-20 15:26:00.00000056,7.0,2014-12-20 15:26:00 UTC,-73.988993,40.758457,-74.002622,40.760712,3
2,2011-05-05 20:36:59.0000006,8.5,2011-05-05 20:36:59 UTC,-73.989198,40.747731,-73.960218,40.770334,1
3,2013-12-17 18:06:00.00000035,10.0,2013-12-17 18:06:00 UTC,-73.958547,40.78126,-73.982792,40.772272,1
4,2014-09-18 01:16:00.00000044,14.0,2014-09-18 01:16:00 UTC,-73.990967,40.730125,-73.984707,40.778985,1


In [5]:
test['key'] = test['key'].astype('datetime64[ns]')
test['pickup_datetime'] = test['pickup_datetime'].astype('datetime64[ns]')
test['pickup_longitude'] = test ['pickup_longitude'].astype('float32')
test['pickup_latitude'] = test ['pickup_latitude'].astype('float32')
test['dropoff_longitude'] = test ['dropoff_longitude'].astype('float32')
test['dropoff_latitude'] = test ['dropoff_latitude'].astype('float32')
test['passenger_count'] = test ['passenger_count'].astype('uint8')

# now add the features
tparts = [dask.delayed(add_features)(part) for part in test.to_delayed()]
test = dask_cudf.from_delayed(tparts)

# inspect the result
test.head().to_pandas()

  ('add_features-f155f44a-a913-47b3-9847-c717cf2d444 ... 'from_delayed')
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


Unnamed: 0,fare_amount,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,hour,year,month,day,weekday,jfk_distance,lga_distance,ewr_distance,tsq_distance,met_distance,wtc_distance
0,18.5,-74.008835,40.741627,-74.008835,40.741627,4,1,2014,7,19,5,22.425308,12.01478,15.11772,2.679279,5.693376,3.228874
1,7.0,-73.988991,40.758457,-74.002625,40.760712,3,15,2014,12,20,5,23.120253,10.980475,16.506964,1.473502,3.916732,5.384778
2,8.5,-73.989197,40.74773,-73.96022,40.770332,1,20,2011,5,5,3,21.012537,7.297031,20.16925,2.53244,1.039028,7.663438
3,10.0,-73.958549,40.781261,-73.982796,40.77227,1,18,2013,12,17,1,22.57865,9.17573,18.591284,1.603035,1.830626,7.016958
4,14.0,-73.990967,40.730125,-73.984711,40.778984,1,1,2014,9,18,3,23.187555,9.324939,18.834074,2.334269,1.811799,7.680531


In [10]:
# make predictions
actual = test['fare_amount']
test = test.drop(['fare_amount'], axis=1)
dtest = xgb.dask.DaskDMatrix(client, test)
prediction = xgb.dask.predict(client, model, dtest)
s = prediction.compute()

In [11]:
s = np.array(s)
s

array([ 8.641043 ,  7.687096 ,  9.970564 , ..., 11.894879 , 27.644188 ,
        7.9569135], dtype=float32)

In [16]:
def eval_metrics(actual, pred):
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

In [19]:
print(eval_metrics(y, s))

AttributeError: 'numpy.ndarray' object has no attribute 'to_array'

In [23]:
#testing

model = xgb.Booster()
model.load_model("/home/nvidiatest/mlops_blog/saved_models/xgboost.model")

test = dask_cudf.read_csv('/home/nvidiatest/mlops_blog/data/test.csv')

test['key'] = test['key'].astype('datetime64[ns]')
test['pickup_datetime'] = test['pickup_datetime'].astype('datetime64[ns]')
test['pickup_longitude'] = test ['pickup_longitude'].astype('float32')
test['pickup_latitude'] = test ['pickup_latitude'].astype('float32')
test['dropoff_longitude'] = test ['dropoff_longitude'].astype('float32')
test['dropoff_latitude'] = test ['dropoff_latitude'].astype('float32')
test['passenger_count'] = test ['passenger_count'].astype('uint8')

# now add the features
tparts = [dask.delayed(add_features)(part) for part in test.to_delayed()]
test = dask_cudf.from_delayed(tparts)

# make predictions
actual = test['fare_amount']
test = test.drop(['fare_amount'], axis=1)
dtest = xgb.dask.DaskDMatrix(client, test)
prediction = xgb.dask.predict(client, model, dtest)
pred = prediction.compute()

def eval_metrics(actual, pred):
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

In [34]:
actual = actual.compute()

AttributeError: 'numpy.ndarray' object has no attribute 'compute'

In [25]:
pred

array([ 8.641043 ,  7.687096 ,  9.970564 , ..., 11.894879 , 27.644188 ,
        7.9569135], dtype=float32)

In [33]:
print(eval_metrics(actual, pred))

(4.379878335998163, 1.719961001996815, 0.8003471070059355)
