# Start Dask Cluster

In [1]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import os

In [2]:
os.system(f"rm -rf ./dask-worker-space")

0

In [3]:
cluster = LocalCUDACluster(silence_logs=50)
client = Client(cluster)
client

2022-11-20 15:26:25,010 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2022-11-20 15:26:25,010 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 1,Total memory: 125.65 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:39331,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 1
Started: Just now,Total memory: 125.65 GiB

0,1
Comm: tcp://127.0.0.1:38917,Total threads: 1
Dashboard: http://127.0.0.1:40319/status,Memory: 125.65 GiB
Nanny: tcp://127.0.0.1:36827,
Local directory: /tmp/dask-worker-space/worker-9fqnyt0s,Local directory: /tmp/dask-worker-space/worker-9fqnyt0s
GPU: NVIDIA GeForce RTX 3090,GPU memory: 24.00 GiB


# Import other libraries

In [4]:
import dask_cudf as pd
from cuml.dask.linear_model import LinearRegression
from cuml.dask.cluster import KMeans, DBSCAN
from cuml.dask.ensemble import RandomForestClassifier
from dask_ml.xgboost import XGBClassifier
from xgboost.dask import DaskDMatrix
import xgboost as xgb
from cuml.dask.decomposition import TruncatedSVD, PCA
import cupy as np

# Input filename
Adjust the filename to one of the files available in the sample_data folder.  
Files are availble publicly for download

In [5]:
in_filename = 'sample_data_20m.csv'

# Define variables

In [6]:
label_col_name = 'is_counterfeit' # the name of the column that contain the labels (0 for negative, 1 for positive)

predictor_col_names = ['Size of U.S. market', # the names of the columns that will be used to predict the label
                       'Price per unit', 
                       'RX/OTC', 
                       'Indication',
                       'Drug Class',
                       'Shortage',
                       'Twitter Mentions'
                      ] 

cetegoric_predictor_col_names = ['RX/OTC', # list of predictor columns that are not values; will use a label encoder on these
                                 'Indication',
                                 'Drug Class', 
                                 'Shortage'
                                ]

# models perform better when values are between 0 and 1. 
# thus, we use a scaling value to attempt to do this
scaling_dict = {'Size of U.S. market': 1/100000000000, # each column in this dictionary will be multiplied by the values here
                'Price per unit': 1/10000,
                'Twitter Mentions': 1/200000
               }

val_set_percent = 0.1 # percent of data to be reserved for validation
test_set_percent = 0.1 # percent of data to be reserved for testing

chunksize = "100MB"
blocksize = "100MB"

random_seed = 42 # to make results reproducible

In [7]:
out_filename = 'write_speed_test_gpu.csv'
in_filename = './sample_data/' + in_filename
out_filename = './output_data/' + out_filename

# Define class for temp file names

In [8]:
class TempFileNameSwitcher:
    def __init__(self):
        self.temp_names = ['./output_data/temp_1.csv', './output_data/temp_2.csv']
        self.cur_iter = 0
        self.name = self.temp_names[self.cur_iter]
        for a_name in self.temp_names:
            try:
                os.system(f"rm -rf {a_name}")
            except:
                pass
            try:
                os.system(f"rm -f {a_name}")
            except:
                pass
        
    def switch(self):
        self.cur_iter += 1
        name_i = self.cur_iter % len(self.temp_names)
        self.name = self.temp_names[name_i]

In [9]:
temp_file_name_switcher = TempFileNameSwitcher()

# ETL

## Single CSV file

### read

In [10]:
df = pd.read_csv(in_filename, chunksize=chunksize)

In [11]:
%%time
_ = df.compute()

CPU times: user 1 s, sys: 1.07 s, total: 2.07 s
Wall time: 6.39 s


### write

In [12]:
os.system(f"rm -rf {out_filename}")
os.system(f"rm -f {out_filename}")

0

In [13]:
%%time
df.to_csv(out_filename, index=False, single_file=True)

CPU times: user 101 ms, sys: 5.13 ms, total: 106 ms
Wall time: 3.4 s


['/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv']

## Multiple CSV files

In [14]:
os.system(f"rm -rf {out_filename}")
os.system(f"rm -f {out_filename}")

0

### write

In [15]:
%%time
df.to_csv(out_filename, index=False)

CPU times: user 98.7 ms, sys: 15.9 ms, total: 115 ms
Wall time: 3.41 s


['/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/00.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/01.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/02.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/03.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/04.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/05.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/06.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/07.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/08.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/09.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/10.part',
 '/home/jcosme/proj/GPU_vs_CPU_v2/output_data/write_speed_test_gpu.csv/11.part',
 '/home/jcosme/proj/GPU_vs_C

### read

In [16]:
df = pd.read_csv(f"{out_filename}/*", chunksize=chunksize)

In [17]:
%%time
_ = df.compute()

CPU times: user 979 ms, sys: 975 ms, total: 1.95 s
Wall time: 5.19 s


## Common DataFrame Operations

In [18]:
df = pd.read_csv(in_filename, chunksize=chunksize)

### describe the dataframe

In [19]:
%%time
df.describe().compute()

CPU times: user 374 ms, sys: 19.4 ms, total: 393 ms
Wall time: 2.32 s


Unnamed: 0,Price per unit,SNOMED,Size of U.S. market,Twitter Mentions,is_counterfeit
count,20000000.0,20000000.0,20000000.0,20000000.0,20000000.0
mean,207.3848,258498300.0,2319031000.0,9099.047,0.38
std,307.297,275806600.0,3181265000.0,5702.964,0.4853865
min,0.1,73211010.0,10021.0,0.0,0.0
25%,2.54,80659010.0,49987310.0,6250.0,0.0
50%,67.48,119292000.0,1742948000.0,10619.0,1.0
75%,690.4,373621000.0,7259270000.0,16882.0,1.0
max,1000.0,840358000.0,9999998000.0,20000.0,1.0


### Set Index
for each categorical variable, set the DataFrame index to that variable
  
**note:** dask dataframes have a 'set_index' method, but it is slower than applying the 'et_index' method to each partition (so we will use the latter method).

In [20]:
def set_cat_index(df, a_cat):
    return df.set_index(a_cat)

In [21]:
%%time
for a_cat in cetegoric_predictor_col_names:
    print(a_cat)
    _ = df.map_partitions(set_cat_index, a_cat).compute()

RX/OTC
Indication
Drug Class
Shortage
CPU times: user 3.71 s, sys: 3.92 s, total: 7.63 s
Wall time: 20.8 s


### Concat multiple DataFrames
concatenate 3 copies of the DF

In [22]:
%%time
_ = pd.concat([df] * 3, ignore_index=True).compute()

CPU times: user 2.76 s, sys: 2.81 s, total: 5.57 s
Wall time: 13.4 s


### Groupby function
for each value column, we calculate the mean by grouping by each categorical variable

In [23]:
non_cetegoric_predictor_col_names = list(set(predictor_col_names).difference(set(cetegoric_predictor_col_names)))
non_cetegoric_predictor_col_names

['Twitter Mentions', 'Size of U.S. market', 'Price per unit']

In [24]:
%%time
for a_cat in cetegoric_predictor_col_names:
    print(a_cat)
    for a_col in non_cetegoric_predictor_col_names:
        print(f"\t{a_col}")
        _ = df.groupby(a_cat)[a_col].mean().compute()

RX/OTC
	Twitter Mentions
	Size of U.S. market
	Price per unit
Indication
	Twitter Mentions
	Size of U.S. market
	Price per unit
Drug Class
	Twitter Mentions
	Size of U.S. market
	Price per unit
Shortage
	Twitter Mentions
	Size of U.S. market
	Price per unit
CPU times: user 877 ms, sys: 90 ms, total: 967 ms
Wall time: 11.5 s


## Preprocess data

In [25]:
df_input = df.copy()

### fit label encoder
dask_cuda allows us to convert a column into a 'category' type, and we can use the .cat.categories method of the column (Series) to get the categories.  
As such, we do not need to fit a label encoder.

In [26]:
%%time
cats_dict = {}
for col in df_input.columns:
    if col in cetegoric_predictor_col_names:
        print(col)
        df_input[col] = df_input[col].astype('category').cat.as_known()
        cats_dict[col] = df_input.head()[col].cat.categories

RX/OTC
Indication
Shortage
Drug Class
CPU times: user 120 ms, sys: 594 µs, total: 121 ms
Wall time: 528 ms


### encode categoric columns
We can use the build in .cat.codes method of a 'category'-type column (series) to encode the values

In [27]:
def get_cat_codes(df, cols):
    for col in cols:
        df[col] = df[col].cat.codes
    return df
df_input[cetegoric_predictor_col_names] = df_input[cetegoric_predictor_col_names].map_partitions(get_cat_codes, cetegoric_predictor_col_names, meta=get_cat_codes(df_input[cetegoric_predictor_col_names].head(), cetegoric_predictor_col_names))

In [28]:
%%time
_ = df_input.compute()

CPU times: user 593 ms, sys: 458 ms, total: 1.05 s
Wall time: 5.81 s


we save a temporary file, and reload the data from this file, so we can isolate the computation time of the next transformation (scale value variable columns)

In [29]:
%%time
old_name = temp_file_name_switcher.name
temp_file_name_switcher.switch()
_ = df_input.to_csv(temp_file_name_switcher.name, index=False)
df_input = pd.read_csv(f"{temp_file_name_switcher.name}/*")
os.system(f"rm -rf {old_name}")

CPU times: user 179 ms, sys: 19.7 ms, total: 199 ms
Wall time: 4.79 s


0

### scale value variable columns
Next we apply the scaling by the variable 'scaling_dict' defined at the top of the notebook

In [30]:
%%time
for col in scaling_dict.keys():
    df_input[col] = df_input[col] * scaling_dict[col]
_ = df_input.compute()

CPU times: user 708 ms, sys: 562 ms, total: 1.27 s
Wall time: 3.53 s


we save a temporary file, and reload the data from this file, so we can isolate the computation time of the next transformation (train/val/test split).  

In [31]:
%%time
old_name = temp_file_name_switcher.name
temp_file_name_switcher.switch()
_ = df_input.to_csv(temp_file_name_switcher.name, index=False)
df_input = pd.read_csv(f"{temp_file_name_switcher.name}/*")
os.system(f"rm -rf {old_name}")

CPU times: user 110 ms, sys: 28.7 ms, total: 139 ms
Wall time: 2.36 s


0

## train/val/test split

In [32]:
not_train_prct = val_set_percent + test_set_percent
train_prct = 1. - not_train_prct

In [33]:
df_train, df_val, df_test = df_input.random_split([train_prct, val_set_percent, test_set_percent], random_state=random_seed)

In [34]:
%%time
_ = df_train.compute()
_ = df_val.compute()
_ = df_test.compute()

CPU times: user 1.04 s, sys: 759 ms, total: 1.8 s
Wall time: 6.54 s


In [35]:
X_train = df_train[predictor_col_names].copy().astype(float)
y_train = df_train[label_col_name].copy().astype(float)

X_val = df_val[predictor_col_names].copy().astype(float)
y_val = df_val[label_col_name].copy().astype(float)

X_test = df_test[predictor_col_names].copy().astype(float)
y_test = df_test[label_col_name].copy().astype(float)

we save temporary files for each split, and reload them from this file, so we can isolate the computation time of the next operations (dimensionality reduction & model fitting).

In [36]:
os.system(f"rm -rf ./output_data/X_train")
os.system(f"rm -rf ./output_data/y_train")

os.system(f"rm -rf ./output_data/X_val")
os.system(f"rm -rf ./output_data/y_val")

os.system(f"rm -rf ./output_data/X_test")
os.system(f"rm -rf ./output_data/y_test")

0

In [37]:
%%time
_ = X_train.to_csv('./output_data/X_train', index=False)
_ = y_train.to_frame().to_csv('./output_data/y_train', index=False)

_ = X_val.to_csv('./output_data/X_val', index=False)
_ = y_val.to_frame().to_csv('./output_data/y_val', index=False)

_ = X_test.to_csv('./output_data/X_test', index=False)
_ = y_test.to_frame().to_csv('./output_data/y_test', index=False)

CPU times: user 709 ms, sys: 67.8 ms, total: 777 ms
Wall time: 8.19 s


In [38]:
%%time
X_train = pd.read_csv('./output_data/X_train/*')
y_train = pd.read_csv('./output_data/y_train/*')
y_train = y_train[y_train.columns[0]]

X_val = pd.read_csv('./output_data/X_val/*')
y_val = pd.read_csv('./output_data/y_val/*')
y_val = y_val[y_val.columns[0]]

X_test = pd.read_csv('./output_data/X_test/*')
y_test = pd.read_csv('./output_data/y_test/*')
y_test = y_test[y_test.columns[0]]

CPU times: user 21.5 ms, sys: 1.58 ms, total: 23.1 ms
Wall time: 21.9 ms


In [39]:
del df, df_input

# Dimensionality Reduction (2D)

## PCA

In [40]:
%%time
pca = PCA(client=client, n_components=2, random_state=random_seed)
X_train_pca = pca.fit_transform(X_train)

CPU times: user 190 ms, sys: 15 ms, total: 205 ms
Wall time: 2.84 s


In [41]:
del pca, X_train_pca

## TruncatedSVD

In [42]:
%%time
trunSVD = TruncatedSVD(client=client, n_components=2, random_state=random_seed)
X_train_trunSVD = trunSVD.fit_transform(X_train)

CPU times: user 237 ms, sys: 19.8 ms, total: 257 ms
Wall time: 2.08 s


In [43]:
del trunSVD, X_train_trunSVD

# Models

## OLS Regression

In [44]:
%%time
reg = LinearRegression(client=client)
reg.fit(X_train, y_train)

CPU times: user 183 ms, sys: 4.5 ms, total: 188 ms
Wall time: 726 ms


<cuml.dask.linear_model.linear_regression.LinearRegression at 0x7f1a5c2b8460>

In [45]:
del reg

## K-Means

In [46]:
n_clusters = y_train.unique().compute().shape[0]
n_clusters

2

In [47]:
%%time
kmeans = KMeans(n_clusters=n_clusters, random_state=random_seed, client=client)
kmeans.fit(X_train)

CPU times: user 56.7 ms, sys: 14.6 ms, total: 71.3 ms
Wall time: 1.05 s


<cuml.dask.cluster.kmeans.KMeans at 0x7f1a4bae6a00>

In [48]:
del kmeans

## Gradient Boosting

In [49]:
%%time
dtrain = DaskDMatrix(client, X_train, y_train)

CPU times: user 54.1 ms, sys: 9.64 ms, total: 63.8 ms
Wall time: 380 ms


In [50]:
%%time
xgb_model = xgb.dask.train(
        client,
    {"tree_method": "gpu_hist", "random_state": random_seed},
    dtrain,
    
)


  client.wait_for_workers(n_workers)
[15:28:15] task [xgboost.dask-2]:tcp://127.0.0.1:38917 got new rank 0


CPU times: user 27.8 ms, sys: 6.85 ms, total: 34.7 ms
Wall time: 811 ms


In [51]:
del xgb

In [52]:
client.shutdown()

In [53]:
del client