This notebook tests `lightgbm.dask`'s behavior with sparse inputs to `pred_contrib()`.

In [1]:
import dask.array as da
import numpy as np
import lightgbm as lgb
from dask import delayed
from dask.distributed import Client, LocalCluster
from lightgbm.dask import DaskLGBMClassifier
from lightgbm.sklearn import LGBMClassifier
from scipy.sparse import csc_matrix
from sklearn.datasets import make_blobs

In [2]:
n_workers = 3
cluster = LocalCluster(n_workers=n_workers)
client = Client(cluster)
client.wait_for_workers(n_workers)

print(f"View the dashboard: {cluster.dashboard_link}")

View the dashboard: http://127.0.0.1:8787/status


In [3]:
chunk_size = 50
X, y = make_blobs(n_samples=100, centers=3, random_state=42)
rnd = np.random.RandomState(42)
dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(csc_matrix)
dy = da.from_array(y, chunks=chunk_size)

In [4]:
dask_clf = DaskLGBMClassifier(n_estimators=5, num_leaves=2, tree_learner="data")
dask_clf.fit(dX, dy)

preds = dask_clf.predict(dX, pred_contrib=True)
preds_computed = preds.compute()

# print(
#     type(preds),
#     type(preds.partitions[0].compute()),
#     type(preds_computed),
#     f"{dask_clf.n_classes_} classes, {dX.shape[1]} features",
# )
# print("---")
# print(dX.partitions[0].compute())
# print("---")
preds.compute().shape

Finding random open ports for workers




--- str(data) ---
dask.array<csc_matrix, shape=(100, 2), dtype=float64, chunksize=(50, 2), chunktype=scipy.csc_matrix>
-----------------
--- str(first part) ---
  (0, 0)	-7.726420909219675
  (1, 0)	5.453396053597771
  (2, 0)	-2.978672008987702
  (3, 0)	6.042673147164201
  (4, 0)	-6.521839830802987
  (5, 0)	3.649342511097413
  (6, 0)	-2.1779341916491863
  (7, 0)	4.4202069483905895
  (8, 0)	4.736956385576142
  (9, 0)	-3.6601912004750528
  (10, 0)	-3.053580347577933
  (11, 0)	-6.65216725654714
  (12, 0)	-6.357685625534373
  (13, 0)	-3.6155325970587784
  (14, 0)	-1.7707310430573397
  (15, 0)	-7.950519689212382
  (16, 0)	-6.602936391821251
  (17, 0)	-2.581207744633084
  (18, 0)	-7.763484627352403
  (19, 0)	-6.406389566577725
  (20, 0)	-2.9726153158652124
  (21, 0)	-6.956728900565374
  (22, 0)	-7.326142143218291
  (23, 0)	-2.147802017544336
  (24, 0)	-2.5450236621627016
  :	:
  (25, 1)	10.071408354417237
  (26, 1)	1.552524361175373
  (27, 1)	-7.737267149692229
  (28, 1)	-6.093024989533495
  

(100, 9)

In [5]:
X = dX.compute()
y = dy.compute()

local_clf = LGBMClassifier()
local_clf.fit(X=dX.compute(), y=y)
local_preds = local_clf.predict(dX.compute().tocsc(), pred_contrib=True)

print(local_clf.n_classes_, type(local_preds))
print("---")
print(local_preds)

3 <class 'list'>
---
[<100x3 sparse matrix of type '<class 'numpy.float64'>'
	with 300 stored elements in Compressed Sparse Column format>, <100x3 sparse matrix of type '<class 'numpy.float64'>'
	with 300 stored elements in Compressed Sparse Column format>, <100x3 sparse matrix of type '<class 'numpy.float64'>'
	with 300 stored elements in Compressed Sparse Column format>]


In [None]:
# import scipy
# type(dX._meta) is scipy.sparse.csc.csc_matrix
# isinstance(dX._meta, (scipy.sparse.csc.csc_matrix, scipy.sparse.csr.csr_matrix))
# m = dX.to_delayed().ravel()
# dir(m[0])

In [None]:
# l = [
#     delayed(lgb.dask._predict_part)(
#         chunk,
#         model=local_clf,
#         raw_score=False,
#         pred_proba=False,
#         pred_leaf=False,
#         pred_contrib=False,
#     )
#     for chunk in dX.to_delayed().ravel()
# ]

In [None]:
import dask.bag as db

In [None]:
bag = db.from_delayed(
    list(dX.to_delayed().ravel())
)

In [None]:
from distributed import wait

type(dX.partitions[0])

In [None]:
bag.compute()

In [None]:
# from functools import partial

# def _predict_accumulate(preds_so_far, new_chunk, pred_fn):
#     #raise RuntimeError(preds_so_far)
# #     try:
# #         preds = pred_fn(new_chunk.compute())
# #     except:
# #         raise RuntimeError(new_chunk)
#     preds = pred_fn(new_chunk.compute())
#     if preds_so_far is None:
#         return preds
#     else:
#         for i in range(len(preds)):
#             preds_so_far[i] = lgb.dask._concat([preds_so_far[i], preds[i]])
#     return preds_so_far

# bag = db.from_sequence(dX.partitions)

# predict_fn = partial(
#     lgb.dask._predict_part,
#     model=local_clf,
#     raw_score=False,
#     pred_proba=False,
#     pred_leaf=False,
#     pred_contrib=True,
# )

# pred_accum = partial(
#     _predict_accumulate,
#     pred_fn=predict_fn
# )

# v = bag.fold(
#     binop=pred_accum,
#     combine=pred_accum,
#     initial=None
# )

# v.compute()

In [6]:
## THIS WORKED!!!

import dask.bag as db
from functools import partial

bag = db.from_sequence(dX.partitions)

def _combine_preds(preds_so_far, new_chunk):
    #raise RuntimeError(preds_so_far)
    for i in range(len(preds_so_far)):
        preds_so_far[i] = lgb.dask._concat([preds_so_far[i], new_chunk[i]])
    return preds_so_far

predict_fn = partial(
    lgb.dask._predict_part,
    model=local_clf,
    raw_score=False,
    pred_proba=False,
    pred_leaf=False,
    pred_contrib=True,
)

def _predict_part_binop(_ignore, chunk, pred_fn):
    return predict_fn(chunk.compute())

predict_part_binop = partial(
    _predict_part_binop,
    pred_fn=predict_fn
)

v = bag.fold(
    binop=predict_part_binop,
    combine=_combine_preds,
    initial=None
)

#v.compute()

In [8]:
v.compute()

[<100x3 sparse matrix of type '<class 'numpy.float64'>'
 	with 300 stored elements in Compressed Sparse Row format>,
 <100x3 sparse matrix of type '<class 'numpy.float64'>'
 	with 300 stored elements in Compressed Sparse Row format>,
 <100x3 sparse matrix of type '<class 'numpy.float64'>'
 	with 300 stored elements in Compressed Sparse Row format>]

In [None]:
v.compute()

In [None]:
def _concat_lists(x, y):
    print("aaa")
    print("--- this ---")
    print(x)
    #print(y)
    print("--- that ---")
    if x is None:
        return y
    else:
        return([
            lgb.dask._concat([x[i], y[i]])
            for i in range(len(x))
        ])

In [None]:
v = bag.map(
    func=lgb.dask._predict_part,
    model=local_clf,
    raw_score=False,
    pred_proba=False,
    pred_leaf=False,
    pred_contrib=True,
)

In [None]:
from functools import partial

predict_fn = partial(
    lgb.dask._predict_part,
    model=local_clf,
    raw_score=False,
    pred_proba=False,
    pred_leaf=False,
    pred_contrib=True,
)

In [None]:
n = v.accumulate(
    binop=_concat_lists,
    initial=None
)

In [None]:
n.compute()

In [None]:
v = bag.map_partitions(
    func=lgb.dask._predict_part,
    model=local_clf,
    raw_score=False,
    pred_proba=False,
    pred_leaf=False,
    pred_contrib=True,
)

In [None]:
reduction(perpartition, aggregate, split_every=None, out_type=<class 'dask.bag.core.Item'>, name=None) method of dask.bag.core.Bag instance

In [None]:
help(v.accumulate)

## References

https://github.com/microsoft/LightGBM/pull/3866

https://docs.dask.org/en/latest/bag-creation.html

https://github.com/dask/dask/issues/7589

https://github.com/microsoft/LightGBM/issues/3881

https://github.com/microsoft/LightGBM/pull/4351/files#diff-4583a656084ea62b391a061e1c1f533e3e99e1d1a6021a5408032d80c6bdd394