<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Read-CSVs-into-Dask-Dataframe" data-toc-modified-id="Read-CSVs-into-Dask-Dataframe-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Read CSVs into Dask Dataframe</a></span></li><li><span><a href="#XGBoost-model" data-toc-modified-id="XGBoost-model-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>XGBoost model</a></span></li><li><span><a href="#XGBoost-model-with-train-test-split" data-toc-modified-id="XGBoost-model-with-train-test-split-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>XGBoost model with train-test split</a></span></li></ul></div>

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [2]:
# set up client with 2 workers, each having two threads and each having a 2GB memory limit
from dask.distributed import Client
client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:61210  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 4  Memory: 4.00 GB


In [3]:
from dask import dataframe as dd, array as da

##### Read CSVs into Dask Dataframe

In [4]:
ddf = dd.read_csv('../data/test/csv*.csv', parse_dates=['date']).set_index('date')
ddf

Unnamed: 0_level_0,y,x
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1
2015-01-01,float64,float64
2019-01-01,...,...
2021-09-26,...,...


##### XGBoost model

In [5]:
import xgboost as xgb

In [8]:
X = ddf['x'].to_dask_array(lengths=True).reshape(-1,1)
y = ddf['y']

In [9]:
dtrain = xgb.dask.DaskDMatrix(client, X, y)

In [53]:
result = xgb.dask.train(
    client=client,
    params={
        "objective": "reg:squarederror",
    },
    dtrain=dtrain,
    num_boost_round=10,
    evals=[(dtrain, "train")],
    early_stopping_rounds=5
)

In [54]:
print(result['history'])

{'train': {'rmse': [15.956649, 11.227454, 7.914044, 5.603489, 3.998896, 2.898554, 2.15791, 1.675652, 1.375932, 1.200241]}}


In [55]:
result['booster'].best_ntree_limit

10

In [56]:
result['booster'].best_score

1.200241

In [57]:
result['booster'].best_iteration

9

In [58]:
result['booster'].best_ntree_limit

10

##### XGBoost model with train-test split

In [43]:
from dask_ml.model_selection import train_test_split

In [44]:
X_train, X_test, y_train, y_test = train_test_split(ddf.x, ddf.y, shuffle=False)

In [46]:
X_train = X_train.to_dask_array(lengths=True).reshape(-1,1)
X_test = X_test.to_dask_array(lengths=True).reshape(-1,1)

In [47]:
dtrain2 = xgb.dask.DaskDMatrix(client, X_train, y_train)

In [48]:
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

In [49]:
result2 = xgb.dask.train(
    client=client,
    params={
        "objective": "reg:squarederror",
    },
    dtrain=dtrain2,
    num_boost_round=10,
    evals=[(dtrain2, "train"), (dtest, "test")],
    early_stopping_rounds=5
)

In [50]:
print(result2['history'])

{'train': {'rmse': [15.906085, 11.192538, 7.892526, 5.589144, 3.989423, 2.889521, 2.148683, 1.665983, 1.365895, 1.189484]}, 'test': {'rmse': [16.538439, 11.678191, 8.278107, 5.887934, 4.221953, 3.06977, 2.304125, 1.801274, 1.486173, 1.30199]}}
