Author: Vinay Bagade(vbagade@nvidia.com)
## This notebook offers GPU baseline for RAPIDS on NYC Taxi data

Dataset at https://drive.google.com/open?id=16ICmBVFkXNwRMTU_27EkYPSSm-c_0m6g
## Introduction
The workflow is composed of three core phases:

1. ETL - Extract, Transform, Load
2. Data Conversion
3. ML - Training
4. ML - Validation

### ETL
Data is 
1. Read in from storage
2. Transformed to emphasize key features

### Data Conversion
Features are
1. Broken into (labels, data) pairs
2. Distributed across many workers
3. Converted into compressed sparse row (CSR) matrix format for XGBoost
4. The data is split between training and validation data

### Machine Learning
The CSR data is fed into a distributed training session with Dask-XGBoost

### Prediction
The Final RMSE on validation data

In [1]:
import cudf
import numpy as np
from numba import cuda
import xgboost as xgb

In [2]:
%env NCCL_P2P_DISABLE=1

env: NCCL_P2P_DISABLE=1


#### Define the paths to data and set the size of the dataset

In [3]:
DATA_TRAIN_PATH = "/home/jovyan/taxi-data"
PERCENT_TRAIN = 0.8
#end year included
start = 2013
end = 2014
#number of months to use
part_count = 5

In [4]:
import numpy as np
import math
from numba import cuda

def initialize_rmm_pool():
    from librmm_cffi import librmm_config as rmm_cfg

    rmm_cfg.use_pool_allocator = True
    rmm_cfg.initial_pool_size = 8<<30 # set to 2GiB. Default is 1/2 total GPU memory
    import cudf
    return cudf._gdf.rmm_initialize()

def initialize_rmm_no_pool():
    from librmm_cffi import librmm_config as rmm_cfg
    
    rmm_cfg.use_pool_allocator = False
    import cudf
    return cudf._gdf.rmm_initialize()

#Numba Kernel to calculate Haversine distance
@cuda.jit
def haversine_kernel(lat1, lon1, lat2, lon2, outputCol):
    iRow = cuda.grid(1)
    p = 0.017453292519943295 # Pi/180
    if iRow < outputCol.size:
        a = 0.5 - math.cos((lat2[iRow] - lat1[iRow]) * p)/2 + math.cos(lat1[iRow] * p) * \
            math.cos(lat2[iRow] * p) * (1 - math.cos((lon2[iRow] - lon1[iRow]) * p)) / 2                                 
        outputCol[iRow] = 12734 * math.asin(math.sqrt(a))
    
def haversine_distance(gdf):
    nRows = gdf.shape[0]
    blockSize = 128
    blockCount = nRows // blockSize + 1
    lat1_arr = gdf['pickup_latitude'].to_gpu_array()
    lon1_arr = gdf['pickup_longitude'].to_gpu_array()
    lat2_arr = gdf['dropoff_latitude'].to_gpu_array()
    lon2_arr = gdf['dropoff_longitude'].to_gpu_array()
                                   
    outputCol = cuda.device_array ( shape=(nRows), dtype=lat1_arr.dtype.name)
    
    haversine_kernel[(blockCount),(blockSize)](lat1_arr, lon1_arr, lat2_arr, lon2_arr, outputCol)
    gdf.add_column(name='h_distance', data = outputCol)
    return gdf

#Numba Kernel to calculate day of the week from Date
@cuda.jit
def day_of_the_week_kernel(output ,year, month, day):
    iRow = cuda.grid(1)
    if iRow < output.size:
        year[iRow] -= month[iRow] < 3
        month[iRow] = (month[iRow] + 9)%12 + 1
        output[iRow] = (year[iRow] + int(year[iRow]/4) - int(year[iRow]/100) + int(year[iRow]/400) + math.floor(2.6*month[iRow] - 0.2) + day[iRow] -1) % 7
    
def day_of_week(gdf):
    nRows = gdf.shape[0]
    blockSize = 128
    blockCount = nRows // blockSize + 1
    year_arr = gdf['year'].to_gpu_array()
    month_arr = gdf['month'].to_gpu_array()
    day_arr = gdf['day'].to_gpu_array()
    outputCol = cuda.device_array ( shape=(nRows), dtype=day_arr.dtype.name)
    
    day_of_the_week_kernel[(blockCount),(blockSize)](outputCol, year_arr, month_arr, day_arr)
    gdf.add_column(name='day_of_week', data = outputCol)
    gdf['day_of_week'] = gdf['day_of_week'].astype('float32')
    return gdf
    

In [5]:
import pandas as pd
def gpu_read_csv(file_path):
    names  = ['vendor_id','pickup_datetime','dropoff_datetime','passenger_count','trip_distance','pickup_longitude',
              'pickup_latitude','rate_code','store_and_fwd','dropoff_longitude','dropoff_latitude','payment_type',
              'fare_amount','surcharge','mta_tax','tip_amount','tolls_amount','total_amount']
    
    dtypes = ['category','date','date','int','float64','float64','float64','category','category','float64','float64',
              'category','float64','float64','float64','float64','float64','float64']

    df = cudf.read_csv(file_path, dtype=dtypes, names=names,skiprows=1)
    return df

In [6]:
def null_workaround(df, **kwargs):
    for column, data_type in df.dtypes.items():
        if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
            df[column] = df[column].fillna(np.dtype(str(df[column].dtype)).type(-1))
    return df

def clean_data(df):
    drop_list = [
        'dropoff_datetime', 'payment_type', 'surcharge', 'mta_tax',
        'tip_amount', 'tolls_amount', 'total_amount'
    ]

    for column in drop_list:
        df.drop_column(column)
        
    df = null_workaround(df)
        
    df_fare = df.query('fare_amount > 0 and fare_amount < 500')
    del(df)
    
    df_pass = df_fare.query('passenger_count > 0 and passenger_count < 6')
    del(df_fare)
    
    df_picklong = df_pass.query('pickup_longitude > -75 and pickup_longitude < -73')
    del(df_pass)
    
    df_droplong = df_picklong.query('dropoff_longitude > -75 and dropoff_longitude < -73')
    del(df_picklong)
    
    df_picklat = df_droplong.query('pickup_latitude > 40 and pickup_latitude < 42')
    del(df_droplong)
    
    df_droplat = df_picklat.query('dropoff_latitude > 40 and dropoff_latitude < 42')
    del(df_picklat)
    
    return df_droplat
    
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    
    df.drop_column('pickup_datetime')
    
    df = day_of_week(df)
    df['is_weekend'] = (df['day_of_week']/4).floor()
    df = haversine_distance(df)
    return df
    

def process_data(train_path):
    df = gpu_read_csv(train_path)
    df = clean_data(df)
    df = add_features(df)
    return df


def run_dask_task(func, **kwargs):
    task = func(**kwargs)
    return task

def process_year_gpu(file_path):
    ml_arrays = run_dask_task(delayed(process_data),train_path=file_path)
    return client.compute(ml_arrays,
                          optimize_graph=False,
                          fifo_timeout="0ms")
    

In [7]:
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
from dask.distributed import Client, wait
import dask
import gc


  data = yaml.load(f.read()) or {}
  defaults = yaml.load(f)


#### Create a Local Cluster on the workstation

In [8]:
import subprocess

from dask_kubernetes import KubeCluster

cluster = KubeCluster.from_yaml('config2.yaml')
cluster.scale_up(5)
client = Client(cluster)

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


In [9]:
client

0,1
Client  Scheduler: tcp://10.233.72.144:42481  Dashboard: http://10.233.72.144:39731/status,Cluster  Workers: 3  Cores: 6  Memory: 48.00 GB


#### ETL

In [13]:
%%time

month = 1
total_count = 0
year = start

gpu_dfs = []

while year <= end:
    current_part_path = DATA_TRAIN_PATH + "/yellow_tripdata_" + str(year) + "-" + str(month) + ".csv"
    print(current_part_path)
    df = process_year_gpu(current_part_path)
    gpu_dfs.append(df)
    month += 1
    total_count += 1
    if total_count == part_count:
        break
    if month > 12:
        month = 1
        year += 1
wait(gpu_dfs)    

/home/jovyan/taxi-data/yellow_tripdata_2013-1.csv
/home/jovyan/taxi-data/yellow_tripdata_2013-2.csv
/home/jovyan/taxi-data/yellow_tripdata_2013-3.csv
/home/jovyan/taxi-data/yellow_tripdata_2013-4.csv
/home/jovyan/taxi-data/yellow_tripdata_2013-5.csv
CPU times: user 1.53 s, sys: 114 ms, total: 1.64 s
Wall time: 29.2 s


#### Convert to CSR and split into test and validation data

In [14]:
%%time

tmp_map = [(gpu_df, list(client.who_has(gpu_df).values())[0]) for gpu_df in gpu_dfs]
new_map = {}
for key, value in tmp_map:
    if value not in new_map:
        new_map[value] = [key]
    else:
        new_map[value].append(key)

del(tmp_map)
gpu_dfs = []
for list_delayed in new_map.values():
    gpu_dfs.append(delayed(cudf.concat)(list_delayed))

del(new_map)
gpu_dfs = [(gpu_df[['fare_amount']], gpu_df[delayed(list)(gpu_df.columns.difference(['fare_amount']))], delayed(len)(gpu_df)) for gpu_df in gpu_dfs]
gpu_dfs = [(gpu_df[0].persist(), gpu_df[1].persist(), int(gpu_df[2].compute()*PERCENT_TRAIN)) for gpu_df in gpu_dfs]

dfs_train = [(df[0][0:df[2]], df[1][0:df[2]]) for df in gpu_dfs]
dfs_test =  [(df[0][df[2]:], df[1][df[2]:]) for df in gpu_dfs]
            
wait(dfs_train)
wait(dfs_test)

del(gpu_dfs)
            
gpu_dfs = [dask.delayed(xgb.DMatrix)(gpu_df[1], gpu_df[0]) for gpu_df in dfs_train]
gpu_dfs = [gpu_df.persist() for gpu_df in gpu_dfs]
gc.collect()
wait(gpu_dfs)
del(dfs_train)

KilledWorker: ('process_data-5ce85407-95e4-4862-a866-579d41488fe5', 'tcp://10.233.111.174:36137')

#### Train the Gradient Boosted Decision Tree with a single call to 
```python
dask_xgboost.train(client, params, data, labels, num_boost_round=dxgb_gpu_params['nround'])
```

In [14]:
%%time
import dask_xgboost as dxgb_gpu
params = {
         'learning_rate': 0.05,
          'max_depth': 8,
          'objective': 'gpu:reg:linear',
          'subsample': 0.8,
          'gamma': 1,
          'silent': True,
          'verbose_eval': True,
          'tree_method':'gpu_hist',
          'n_gpus': 1
         }
labels = None

bst = dxgb_gpu.train(client, params, gpu_dfs, labels, num_boost_round=100)

CPU times: user 1.71 s, sys: 272 ms, total: 1.99 s
Wall time: 36.8 s


#### Prediction on Validation Data

In [None]:
%%time
#make some room in memory
del gpu_dfs

#create the testing datasets for prediciotn and auc
dtest =  [xgb.DMatrix(df[1].compute()) for df in dfs_test]
y_test = [df[0].compute() for df in dfs_test]

del dfs_test

In [None]:
def rmse(predictions, targets):
    return np.sqrt(((predictions - targets) ** 2).mean())

In [None]:
avg_rmse = 0
for i in range(0,len(dtest)):
    y_pred = bst.predict(dtest[i])
    y_pred = bst.predict(dtest[i])
    y_true = y_test[i]['fare_amount'].to_array()
    avg_rmse += rmse(y_pred,y_true)

avg_rmse = avg_rmse/len(dtest)
print("avg rmse "+ str(avg_rmse))