In [28]:
USE_GPU = True

In [29]:
import os
import time

from dask.distributed import Client, wait, get_worker

from dask_snowflake import read_snowflake

if USE_GPU:
    from dask_cuda import LocalCUDACluster
    from cuml.metrics import mean_squared_error
    from cuml.dask.preprocessing import LabelEncoder
    from cuml import ForestInference
    import dask_cudf

else:
    from dask.distributed import LocalCluster
    from sklearn.metrics import mean_squared_error
    from dask_ml.preprocessing import LabelEncoder

### Step 1: Startup a Dask Cluster

We can startup a dask cluster using the `LocalCluster` or `LocalCUDACluster` api (for GPUs) to enable distributed compute using Dask. Alternatively the cluster could be setup [manually](https://docs.rapids.ai/api/dask-cuda/nightly/quickstart.html#dask-cuda-worker) outside of this script and we can connect to it directly. (This approach is more useful when setting up multi-node clusters)

In [30]:
TPW = 8 

if USE_GPU:
    cluster = LocalCUDACluster(threads_per_worker=TPW)
else:
    cluster = LocalCluster(threads_per_worker=TPW)

client = Client(cluster)


num_workers = len(client.scheduler_info()["workers"])

Perhaps you already have a cluster running?
Hosting the HTTP server on port 42311 instead
2022-04-28 17:23:57,918 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-04-28 17:23:57,919 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


In [31]:
print(client)

<Client: 'tcp://127.0.0.1:33243' processes=2 threads=16, memory=0.98 TiB>


In [45]:
client.scheduler_info()

0,1
Comm: tcp://127.0.0.1:33243,Workers: 2
Dashboard: http://127.0.0.1:42311/status,Total threads: 16
Started: 3 minutes ago,Total memory: 0.98 TiB

0,1
Comm: tcp://127.0.0.1:44577,Total threads: 8
Dashboard: http://127.0.0.1:37933/status,Memory: 503.53 GiB
Nanny: tcp://127.0.0.1:42067,
Local directory: /rapids/notebooks/snowflake/dask-worker-space/worker-oefxzlt6,Local directory: /rapids/notebooks/snowflake/dask-worker-space/worker-oefxzlt6
GPU: NVIDIA RTX A6000,GPU memory: 47.99 GiB
Tasks executing: 0,Tasks in memory: 4
Tasks ready: 0,Tasks in flight: 0
CPU usage: 0.0%,Last seen: Just now
Memory usage: 3.06 GiB,Spilled bytes: 0 B
Read bytes: 71.16 kiB,Write bytes: 28.88 kiB

0,1
Comm: tcp://127.0.0.1:37013,Total threads: 8
Dashboard: http://127.0.0.1:35289/status,Memory: 503.53 GiB
Nanny: tcp://127.0.0.1:35977,
Local directory: /rapids/notebooks/snowflake/dask-worker-space/worker-_ur4glp9,Local directory: /rapids/notebooks/snowflake/dask-worker-space/worker-_ur4glp9
GPU: NVIDIA RTX A6000,GPU memory: 47.99 GiB
Tasks executing: 0,Tasks in memory: 4
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 3.11 GiB,Spilled bytes: 0 B
Read bytes: 71.04 kiB,Write bytes: 28.83 kiB


### Step 2: Use `dask_snowflake` to authenticate and execute a query on the Snowflake engine. 

`dask_snowflake` returns a dask dataframe which is lazy by nature. This means that it does not read the data from the cloud unless any specific operation requires it to be computed.  


Here we have a simple query that reads a few tables from the snowflake sample database and joins on a few keys

In [32]:
from dotenv import load_dotenv

config = load_dotenv()

DB_CREDS = {
    "user": os.environ["SNOW_USER"],
    "password": os.environ["SNOW_PASSWORD"],
    "account": os.environ["SNOW_ACCOUNT"],
    "database": "SAMPLE_DATA",
    "warehouse": os.environ["SNOW_WAREHOUSE"]
}

QUERY = """
SELECT 
    ss.SS_ITEM_SK, ss.SS_STORE_SK, s.S_STATE,
    d.D_DATE, ss.SS_QUANTITY, ss.SS_EXT_WHOLESALE_COST, 
    ss.SS_EXT_LIST_PRICE, ss.SS_NET_PROFIT  
FROM 
    SAMPLE_DATA.TPCDS_SF100TCL.STORE_SALES ss, 
    SAMPLE_DATA.TPCDS_SF100TCL.STORE s,
    SAMPLE_DATA.TPCDS_SF100TCL.DATE_DIM d
WHERE 
    ss.SS_STORE_SK = s.S_STORE_SK and
    ss.SS_SOLD_DATE_SK = d.D_DATE_SK
LIMIT 1000000
"""

#### Use dask_snowflake to read from snowflake into a dask dataframe

In [33]:
ddf = read_snowflake(
    query=QUERY,
    connection_kwargs=DB_CREDS
)

#### Convert cpu backed dask dataframes to gpu dask_cudf dataframes

In [34]:
if USE_GPU:
    ddf = dask_cudf.from_dask_dataframe(ddf)
    ddf = ddf.repartition(npartitions=num_workers*2)

### Step 3: Preprocessing

We can now proceed to cleanup our data and generate features that can be used for training an xgBoost model

In [35]:
ddf = ddf.dropna()
print(ddf.dtypes)

SS_ITEM_SK                int64
SS_STORE_SK               int64
S_STATE                  object
D_DATE                   object
SS_QUANTITY               int64
SS_EXT_WHOLESALE_COST     int64
SS_EXT_LIST_PRICE         int64
SS_NET_PROFIT             int64
dtype: object


Inspecting the dtypes shows that the date column is treated as an object (most likely a string).
We can easily cast that to a datetime dtype that'll help enable more datetime operations downstream.

In [36]:
ddf["D_DATE"] = ddf["D_DATE"].astype("datetime64[s]")

The dataframe contains store sales made across the years 1998 and 1999.
Let's take all the sales made in 1998 and try to train a model that'll predict the `net_profit` given certain features like `quantity`, `wholesale_cost` etc. 

The cell below triggers a computation since we `persist` the data from 1998 on our dask workers. This step pulls the result set from snowflake onto the workers.

In [37]:
ddf_1998 = ddf[ddf.D_DATE.dt.year == 1998]
ddf_1998 = ddf_1998.persist()
wait(ddf_1998)
print(len(ddf_1998))

241073


We can now use a `LabelEncoder` to encode the categorical variable `S_STATE` which denotes the state in which the sale was made.

In [38]:
s_state_le = LabelEncoder()
encoded = s_state_le.fit_transform(ddf_1998.S_STATE)
ddf_1998 = ddf_1998.assign(S_STATE_VAL=encoded)

We can also generate a couple of more features such as the day of week or the month in which the sale was made using the `dt` accessor for datetime objects

In [39]:
ddf_1998["DAY_OF_WEEK"] = ddf_1998.D_DATE.dt.dayofweek
ddf_1998["MONTH"] = ddf_1998.D_DATE.dt.month

In [40]:
ddf_1998 = ddf_1998.persist()
wait(ddf_1998)
del(encoded)

### Step 4: Train an Xgboost Model

For large scale ML problems it is common to use a `Grid Search` approach to train models over different hyperparameters and choose which one performs the best for the problem.

In [41]:
import itertools
import math

# Inspired by the solution provided here
# https://stackoverflow.com/questions/69300562/how-to-define-the-grid-for-using-grid-search-from-scratch-in-python


def generate_grid(params_grid):
    """
    Generate all combinations of parameters provided in `params_grid`.
    Returns a list of dictionaries each with a unique combination of params
    """
    param_keys = params_grid.keys()
    param_combinations = itertools.product(*params_grid.values())
    return [dict(zip(param_keys, combo)) for combo in param_combinations]

In [42]:
dxgb_params_options = {
    "tree_method": ["gpu_hist" if USE_GPU else "hist"],
    "objective": ["reg:squarederror"],
    "learning_rate": [0.01, 0.05, 0.1],
    "max_depth": [5, 6, 7, 8],
    "reg_lambda": [1.0, 5.0],
}

In [43]:
import xgboost as xgb


X_train = ddf_1998[ddf_1998.columns.difference(["SS_NET_PROFIT", "D_DATE", "S_STATE"])]
y_train = ddf_1998["SS_NET_PROFIT"]

dgxb_params_list = generate_grid(dxgb_params_options)

dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)

In [46]:
min_rmse = math.inf
best_dxgb_params = None
model_fname = "gpu_model.json" if USE_GPU else "cpu_model.json"

# Iterate over our hyperparameters and choose the model with the best rmse
# In a real world scenario it would make more sense to use some test set for evaluation

for iteration, dxgb_param in enumerate(dgxb_params_list):
    print(f"HyperParameter set {iteration}")
    t1 = time.time()
    bst = xgb.dask.train(
        client,
        dxgb_param,
        dtrain,
        num_boost_round=200,
        evals=[(dtrain, "train")],
        verbose_eval=False,
    )
    
    train_rmse = bst["history"]["train"]["rmse"][-1]
    print(f"Hyperparameter set {iteration} took {time.time()-t1}s with an RMSE of {train_rmse}")
    
    if train_rmse < min_rmse:
        min_rmse = train_rmse
        best_dxgb_params = dxgb_param
        bst["booster"].save_model(model_fname)

HyperParameter set 0


[17:28:14] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:14] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 0 took 0.5619456768035889s with an RMSE of 1166.643311
HyperParameter set 1


[17:28:14] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:14] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 1 took 0.5334196090698242s with an RMSE of 1168.474854
HyperParameter set 2


[17:28:15] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 0
[17:28:15] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 1


Hyperparameter set 2 took 0.8261308670043945s with an RMSE of 1112.79187
HyperParameter set 3


[17:28:16] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:16] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 3 took 0.8297691345214844s with an RMSE of 1114.808838
HyperParameter set 4


[17:28:16] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:16] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 4 took 1.3339109420776367s with an RMSE of 1071.176636
HyperParameter set 5


[17:28:18] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:18] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 5 took 1.3080096244812012s with an RMSE of 1074.588013
HyperParameter set 6


[17:28:19] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:19] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 6 took 2.178530693054199s with an RMSE of 1032.596436
HyperParameter set 7


[17:28:21] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:21] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 7 took 2.1150012016296387s with an RMSE of 1035.16272
HyperParameter set 8


[17:28:23] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:23] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 8 took 0.5266432762145996s with an RMSE of 990.884521
HyperParameter set 9


[17:28:24] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:24] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 9 took 0.526362419128418s with an RMSE of 995.288391
HyperParameter set 10


[17:28:25] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:25] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 10 took 0.8120620250701904s with an RMSE of 925.36853
HyperParameter set 11


[17:28:25] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:25] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 11 took 0.8146116733551025s with an RMSE of 924.683533
HyperParameter set 12


[17:28:26] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:26] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 12 took 1.282700538635254s with an RMSE of 841.312195
HyperParameter set 13


[17:28:28] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:28] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 13 took 1.2612144947052002s with an RMSE of 851.612976
HyperParameter set 14


[17:28:29] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:29] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 14 took 1.9620623588562012s with an RMSE of 751.1297
HyperParameter set 15


[17:28:31] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:31] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 15 took 1.9888954162597656s with an RMSE of 765.481873
HyperParameter set 16


[17:28:33] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:33] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 16 took 0.53468918800354s with an RMSE of 912.525269
HyperParameter set 17


[17:28:33] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:33] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 17 took 0.5265071392059326s with an RMSE of 912.370667
HyperParameter set 18


[17:28:34] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:34] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 18 took 0.8098516464233398s with an RMSE of 807.171692
HyperParameter set 19


[17:28:35] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:35] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 19 took 0.8083326816558838s with an RMSE of 807.884827
HyperParameter set 20


[17:28:35] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:35] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 20 took 1.2739548683166504s with an RMSE of 689.32428
HyperParameter set 21


[17:28:37] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:37] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 21 took 1.3129804134368896s with an RMSE of 699.092407
HyperParameter set 22


[17:28:38] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 0
[17:28:38] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 1


Hyperparameter set 22 took 2.0488574504852295s with an RMSE of 565.761169
HyperParameter set 23


[17:28:40] task [xgboost.dask]:tcp://127.0.0.1:44577 got new rank 0
[17:28:40] task [xgboost.dask]:tcp://127.0.0.1:37013 got new rank 1


Hyperparameter set 23 took 1.9529192447662354s with an RMSE of 601.445984


In [47]:
del (X_train, y_train, ddf_1998, dtrain, bst)

### Step 5: Inference

Now that we're done training and have a saved model we can use this model for inferencing.

Let's use the data from 1999 to predict the net profit using the model trained above

In [48]:
ddf_1999 = ddf[ddf.D_DATE.dt.year == 1999]
ddf_1999 = ddf_1999.repartition(num_workers)
ddf_1999 = ddf_1999.persist()
wait(ddf_1999)
print(len(ddf_1999))

728281


In [49]:
# Encode `S_STATES` in the test dataframe using the same encoding fit on the trained data.

encoded = s_state_le.transform(ddf_1999.S_STATE)
ddf_1999 = ddf_1999.assign(S_STATE_VAL=encoded)

ddf_1999["DAY_OF_WEEK"] = ddf_1999.D_DATE.dt.dayofweek
ddf_1999["MONTH"] = ddf_1999.D_DATE.dt.month

ddf_1999 = ddf_1999.persist()
wait(ddf_1999)
del(encoded, s_state_le)

In [50]:
X_test = ddf_1999[ddf_1999.columns.difference(["SS_NET_PROFIT", "D_DATE", "S_STATE"])]
y_expect = ddf_1999["SS_NET_PROFIT"]

#### GPU inferencing using FIL

Rapids [Forest Inference library](https://medium.com/rapids-ai/rapids-forest-inference-library-prediction-at-100-million-rows-per-second-19558890bc35) is a great tool that can be used for really fast inferencing on the GPU with any saved treelite model (trained on the CPU or GPU)

In [51]:
# Initialize all dask workers with the xgboost model
def worker_init(
    dask_worker,
    model_file="gpu_model.json",
):

    dask_worker.data["fil_model"] = ForestInference.load(
        filename=model_file,
        model_type="xgboost_json",
    )


if USE_GPU:
    client.run(worker_init)
    print("Loaded model on all dask workers")

Loaded model on all dask workers


In [52]:
if USE_GPU:
    def fil_pred(df):
        dask_worker = get_worker()
        return dask_worker.data["fil_model"].predict(df)


    y_got = X_test.map_partitions(fil_pred, meta="float")
    y_got = y_got.persist()
    wait(y_got)
    print(y_got.head())

0   -1649.329468
1    -399.225464
2    -399.225464
3    -399.225464
4     -15.344209
dtype: float32


#### Inferencing on the CPU with XGBoost

In [53]:
if not USE_GPU:
    bst = xgb.Booster()
    bst.load_model(fname="cpu_model.json")

In [54]:
if not USE_GPU:
    y_got = xgb.dask.predict(client, model=bst, data=X_test)
    y_got = y_got.persist()
    wait(y_got)
    print(y_got.head())

#### Check the inference RMSE

In [55]:
inference_rmse = mean_squared_error(
    y_true=y_expect.compute().values,
    y_pred=y_got.compute().values.astype("float64"),
    squared=False,
)
print(f"Testing inference: {inference_rmse}")

Testing inference: 1666.5182706565467


In [56]:
del (ddf_1999, X_test, y_expect, y_got)