In [None]:
import dask.array as da
import lightgbm as lgb
import numpy as np
from dask import delayed
from sklearn.datasets import load_svmlight_file
from distributed import Client, LocalCluster, wait

In [None]:
X, y = load_svmlight_file("../lambdarank/rank.train")
group = np.loadtxt("../lambdarank/rank.train.query")

cluster = LocalCluster(n_workers=2)
client = Client(cluster)

In [None]:
# split training data into two partitions
rows_in_part1 = int(np.sum(group[:100]))
num_features = X.shape[1]

# make this array dense because we're splitting across
# a sparse boundary to partition the data
X = X.todense()

dX = da.from_array(
  x=X,
  chunks=[
    (rows_in_part1, X.shape[0] - rows_in_part1),
    (num_features, )
  ]
)

dy = da.from_array(
    x=y,
    chunks=[
        (rows_in_part1, X.shape[0] - rows_in_part1),
    ]
)
dg = da.from_array(
    x=group,
    chunks=[
        (100, group.size - 100)
    ]
)

In [None]:
dask_model = lgb.DaskLGBMRanker()
dask_model.fit(dX, dy, group=dg)

In [None]:
rows_in_part1 = int(np.sum(group[:100]))
num_features = X.shape[1]

dX = da.concatenate(
    [
        da.from_array(X[:rows_in_part1]),
        da.from_array(X[rows_in_part1:])
    ]
)
dy = da.concatenate(
    [
        da.from_array(y[:rows_in_part1]),
        da.from_array(y[rows_in_part1:])
    ]
)
dg = da.concatenate(
    [
        da.from_array(group[:100]),
        da.from_array(group[100:])
    ]
)

In [None]:
def _split_to_parts(data, is_matrix):
    parts = data.to_delayed()
    if isinstance(parts, np.ndarray):
        if is_matrix:
            assert parts.shape[1] == 1
        else:
            assert parts.ndim == 1 or parts.shape[1] == 1
        parts = parts.flatten().tolist()
    return parts

def _concat(seq):
    if isinstance(seq[0], np.ndarray):
        return np.concatenate(seq, axis=0)
    elif isinstance(seq[0], (pd_DataFrame, pd_Series)):
        return concat(seq, axis=0)
    elif isinstance(seq[0], ss.spmatrix):
        return ss.vstack(seq, format='csr')
    else:
        raise TypeError('Data must be one of: numpy arrays, pandas dataframes, sparse matrices (from scipy). Got %s.' % str(type(seq[0])))

In [None]:
# Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality
data_parts = _split_to_parts(data=dX, is_matrix=True)
label_parts = _split_to_parts(data=dy, is_matrix=False)

parts = [{'data': x, 'label': y} for (x, y) in zip(data_parts, label_parts)]
n_parts = len(parts)

group_parts = _split_to_parts(data=dg, is_matrix=False)
for i in range(n_parts):
    parts[i]['group'] = group_parts[i]

# Start computation in the background
parts = list(map(delayed, parts))
parts = client.compute(parts)
wait(parts)

In [None]:
from collections import defaultdict

key_to_part_dict = {part.key: part for part in parts}  # type: ignore
who_has = client.who_has(parts)
worker_map = defaultdict(list)
for key, workers in who_has.items():
    worker_map[next(iter(workers))].append(key_to_part_dict[key])

In [None]:
[x['data'] for x in parts]

In [None]:
# def _group_chunk(group, start, end):
#     return(group[start:end])

# group_chunks = [
#     delayed(_group_chunk)(group, 0, 100),
#     delayed(_group_chunk)(group, 100, group.size)
# ]

dg = da.from_delayed(group_chunks)

In [None]:
group.size

In [None]:
# # split training data into two partitions
# rows_in_part1 = int(np.sum(group[:100]))
# num_features = X.shape[1]

# dX = da.from_array(
#   x=X,
#   chunks=[
#     (rows_in_part1, num_features),
#     (X.shape[0] - rows_in_part1, num_features)
#   ]
# )

# dy = da.from_array(y)
# dg = da.from_array(group)

dask_model = lgb.DaskLGBMRanker()
dask_model.fit(dX, dy, group=dg)