Skip to content

Commit

Permalink
Support missing argument for tensor.tosparse() and fill_value a…
Browse files Browse the repository at this point in the history
…rgument for `sparse_tensor.todense()` (#1797)
  • Loading branch information
Xuye (Chris) Qin committed Dec 17, 2020
1 parent 6d958c4 commit 641418d
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 22 deletions.
1 change: 1 addition & 0 deletions mars/learn/contrib/lightgbm/_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def tile(cls, op: "LGBMPredict"):
@classmethod
def execute(cls, ctx, op: "LGBMPredict"):
in_data = ctx[op.data.key]
in_data = in_data.spmatrix if hasattr(in_data, 'spmatrix') else in_data
out = op.outputs[0]

if op.data.shape[0] == 0:
Expand Down
6 changes: 5 additions & 1 deletion mars/learn/contrib/lightgbm/_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ def execute(cls, ctx, op: "LGBMTrain"):
from lightgbm.basic import _safe_call, _LIB

data_val = ctx[op.data.key]
data_val = data_val.spmatrix if hasattr(data_val, 'spmatrix') else data_val

label_val = ctx[op.label.key]
sample_weight_val = ctx[op.sample_weight.key] if op.sample_weight is not None else None
init_score_val = ctx[op.init_score.key] if op.init_score is not None else None
Expand All @@ -266,7 +268,9 @@ def execute(cls, ctx, op: "LGBMTrain"):
else:
eval_set, eval_sample_weight, eval_init_score = [], [], []
for data, label in zip(op.eval_datas, op.eval_labels):
eval_set.append((ctx[data.key], ctx[label.key]))
data_eval = ctx[data.key]
data_eval = data_eval.spmatrix if hasattr(data_eval, 'spmatrix') else data_eval
eval_set.append((data_eval, ctx[label.key]))
for weight in op.eval_sample_weights:
eval_sample_weight.append(ctx[weight.key] if weight is not None else None)
for score in op.eval_init_scores:
Expand Down
16 changes: 16 additions & 0 deletions mars/learn/contrib/lightgbm/tests/test_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import unittest

import numpy as np

import mars.tensor as mt
import mars.dataframe as md
from mars.session import new_session
Expand All @@ -37,6 +39,9 @@ def setUp(self):
self.X = rs.rand(n_rows, n_columns, chunk_size=chunk_size)
self.y = rs.rand(n_rows, chunk_size=chunk_size)
self.X_df = md.DataFrame(self.X)
x_sparse = np.random.rand(n_rows, n_columns)
x_sparse[np.arange(n_rows), np.random.randint(n_columns, size=n_rows)] = np.nan
self.X_sparse = mt.tensor(x_sparse, chunk_size=chunk_size).tosparse(missing=np.nan)

self.session = new_session().as_default()
self._old_executor = self.session._sess._executor
Expand All @@ -58,6 +63,17 @@ def testLocalClassifier(self):

self.assertIsInstance(prediction, mt.Tensor)

# test sparse tensor
X_sparse = self.X_sparse
classifier = LGBMClassifier(n_estimators=2)
classifier.fit(X_sparse, y, eval_set=[(X_sparse, y)], verbose=True)
prediction = classifier.predict(X_sparse)

self.assertEqual(prediction.ndim, 1)
self.assertEqual(prediction.shape[0], len(self.X))

self.assertIsInstance(prediction, mt.Tensor)

prob = classifier.predict_proba(X)
self.assertEqual(prob.shape, X.shape)

Expand Down
1 change: 1 addition & 0 deletions mars/learn/contrib/xgboost/dmatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def get_xgb_dmatrix(tup):
from xgboost import DMatrix

data, label, weight, missing, feature_names, feature_types = tup
data = data.spmatrix if hasattr(data, 'spmatrix') else data
return DMatrix(data, label=label, missing=missing, weight=weight,
feature_names=feature_names, feature_types=feature_types,
nthread=-1)
Expand Down
1 change: 1 addition & 0 deletions mars/learn/contrib/xgboost/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def execute(cls, ctx, op):
if isinstance(data, tuple):
data = ToDMatrix.get_xgb_dmatrix(data)
else:
data = data.spmatrix if hasattr(data, 'spmatrix') else data
data = DMatrix(data)
result = op.model.predict(data)

Expand Down
6 changes: 6 additions & 0 deletions mars/learn/contrib/xgboost/tests/test_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def setUp(self):
self.y = rs.rand(n_rows, chunk_size=chunk_size)
self.X_df = md.DataFrame(self.X)
self.y_series = md.Series(self.y)
x_sparse = np.random.rand(n_rows, n_columns)
x_sparse[np.arange(n_rows), np.random.randint(n_columns, size=n_rows)] = np.nan
self.X_sparse = mt.tensor(x_sparse, chunk_size=chunk_size).tosparse(missing=np.nan)

self.session = new_session().as_default()
self._old_executor = self.session._sess._executor
Expand All @@ -58,6 +61,9 @@ def testLocalPredictTensor(self):
prediction = predict(booster, self.X)
self.assertIsInstance(prediction.to_numpy(), np.ndarray)

prediction = predict(booster, self.X_sparse)
self.assertIsInstance(prediction.to_numpy(), np.ndarray)

prediction = predict(booster, dtrain)
self.assertIsInstance(prediction.fetch(), np.ndarray)

Expand Down
10 changes: 10 additions & 0 deletions mars/learn/contrib/xgboost/tests/test_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import unittest

import numpy as np

import mars.tensor as mt
import mars.dataframe as md
from mars.session import new_session
Expand Down Expand Up @@ -45,6 +47,9 @@ def setUp(self):
self.X_df = md.DataFrame(self.X)
self.y_series = md.Series(self.y)
self.weight = rs.rand(n_rows, chunk_size=chunk_size)
x_sparse = np.random.rand(n_rows, n_columns)
x_sparse[np.arange(n_rows), np.random.randint(n_columns, size=n_rows)] = np.nan
self.X_sparse = mt.tensor(x_sparse, chunk_size=chunk_size).tosparse(missing=np.nan)

self.session = new_session().as_default()
self._old_executor = self.session._sess._executor
Expand Down Expand Up @@ -81,6 +86,11 @@ def testLocalTrainTensor(self):
booster = train({}, dtrain, num_boost_round=2)
self.assertIsInstance(booster, Booster)

def testLocalTrainSparseTensor(self):
dtrain = MarsDMatrix(self.X_sparse, self.y)
booster = train({}, dtrain, num_boost_round=2)
self.assertIsInstance(booster, Booster)

def testLocalTrainDataFrame(self):
dtrain = MarsDMatrix(self.X_df, self.y_series)
booster = train({}, dtrain, num_boost_round=2)
Expand Down
8 changes: 4 additions & 4 deletions mars/tensor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,19 @@ def is_scalar(self):

isscalar = is_scalar

def tosparse(self):
def tosparse(self, missing=None):
if self.issparse():
return self

from .datasource import fromdense
return fromdense(self)
return fromdense(self, missing=missing)

def todense(self):
def todense(self, fill_value=None):
if not self.issparse():
return self

from .datasource import fromsparse
return fromsparse(self)
return fromsparse(self, fill_value=fill_value)

def transpose(self, *axes):
from .base import transpose
Expand Down
48 changes: 41 additions & 7 deletions mars/tensor/datasource/from_dense.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import Iterable
from functools import reduce
from operator import and_

import numpy as np
import pandas as pd

from ... import opcodes as OperandDef
from ...serialize import KeyField
from ...serialize import KeyField, AnyField
from ...lib.sparse import SparseNDArray
from ...lib.sparse.core import naked, cps, sps
from .core import TensorHasInput
Expand All @@ -25,22 +32,49 @@
class DenseToSparse(TensorHasInput):
_op_type_ = OperandDef.DENSE_TO_SPARSE

_input = KeyField('_input')
_input = KeyField('input')
_missing = AnyField('missing')

def __init__(self, dtype=None, gpu=None, missing=None, **kw):
super().__init__(_dtype=dtype, _gpu=gpu, _sparse=True,
_missing=missing, **kw)

def __init__(self, dtype=None, gpu=None, **kw):
super().__init__(_dtype=dtype, _gpu=gpu, _sparse=True, **kw)
@property
def missing(self):
return self._missing

@staticmethod
def _get_mask(data, missing):
if isinstance(missing, Iterable):
return reduce(and_, (DenseToSparse._get_mask(data, m) for m in missing))
elif pd.isna(missing):
return ~pd.isna(data)
else:
return data != missing

@classmethod
def execute(cls, ctx, op):
out = op.outputs[0]
in_data = naked(ctx[op.inputs[0].key])
missing = op.missing
shape = in_data.shape \
if any(np.isnan(s) for s in out.shape) else out.shape

xps = cps if op.gpu else sps
ctx[op.outputs[0].key] = SparseNDArray(xps.csr_matrix(in_data), shape=op.outputs[0].shape)
if missing is None:
ctx[out.key] = \
SparseNDArray(xps.csr_matrix(in_data), shape=shape)
else:
mask = cls._get_mask(in_data, missing)
spmatrix = xps.csr_matrix((in_data[mask], mask.nonzero()),
shape=shape)
ctx[out.key] = SparseNDArray(spmatrix)


def fromdense(a):
def fromdense(a, missing=None):
a = tensor(a)
if a.issparse():
return a

op = DenseToSparse(dtype=a.dtype, gpu=a.op.gpu)
op = DenseToSparse(dtype=a.dtype, gpu=a.op.gpu, missing=missing)
return op(a)
42 changes: 32 additions & 10 deletions mars/tensor/datasource/from_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# limitations under the License.

from ... import opcodes as OperandDef
from ...serialize import KeyField, StringField
from ...serialize import KeyField, StringField, AnyField
from ..array_utils import as_same_device, device, get_array_module
from ..utils import get_order
from .core import TensorHasInput
from .array import tensor
Expand All @@ -24,29 +25,50 @@
class SparseToDense(TensorHasInput):
_op_type_ = OperandDef.SPARSE_TO_DENSE

_input = KeyField('_input')
_order = StringField('_order')
_input = KeyField('input')
_order = StringField('order')
_fill_value = AnyField('fill_value')

def __init__(self, dtype=None, gpu=None, order=None, **kw):
super().__init__(_dtype=dtype, _gpu=gpu, _sparse=False, _order=order, **kw)
def __init__(self, dtype=None, fill_value=None, gpu=None, order=None, **kw):
super().__init__(_dtype=dtype, _fill_value=fill_value,
_gpu=gpu, _sparse=False, _order=order, **kw)

@property
def order(self):
return self._order

@property
def fill_value(self):
return self._fill_value

@classmethod
def execute(cls, ctx, op):
ctx[op.outputs[0].key] = \
ctx[op.inputs[0].key].toarray().astype(
op.outputs[0].dtype, order=op.order, copy=False)
fill_value = op.fill_value
out = op.outputs[0]
(inp,), device_id, xp = as_same_device(
[ctx[inp.key] for inp in op.inputs], device=op.device, ret_extra=True)

with device(device_id):
if fill_value is None:
ctx[out.key] = inp.toarray().astype(
out.dtype, order=op.order, copy=False)
else:
xp = get_array_module(xp)
spmatrix = inp.spmatrix
inds = spmatrix.nonzero()
ret = xp.full(inp.shape, fill_value, dtype=out.dtype,
order=op.order)
ret[inds] = spmatrix.data
ctx[out.key] = ret


def fromsparse(a, order='C'):
def fromsparse(a, order='C', fill_value=None):
a = tensor(a)
if not a.issparse():
return a.astype(a.dtype, order=order, copy=False)

tensor_order = get_order(order, None, available_options='CF',
err_msg="only 'C' or 'F' order is permitted")
op = SparseToDense(dtype=a.dtype, gpu=a.op.gpu, order=order)
op = SparseToDense(dtype=a.dtype, gpu=a.op.gpu,
order=order, fill_value=fill_value)
return op(a, order=tensor_order)
25 changes: 25 additions & 0 deletions mars/tensor/datasource/tests/test_datasource_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ def testCreateSparseExecution(self):
np.testing.assert_array_equal(res[0].toarray(), mat[..., :2].toarray())
np.testing.assert_array_equal(res[1].toarray(), mat[..., 2:].toarray())

# test missing argument
t4 = tensor(np.array([[0, 0, 2], [2, 0, 0]]), chunk_size=2).tosparse(missing=2)
t4 = t4 + 1
expected = mat.toarray()
raw = expected.copy()
expected[raw == 0] += 1
expected[raw != 0] = 0

res = self.executor.execute_tensor(t4, concat=True)[0]
self.assertIsInstance(res, SparseNDArray)
self.assertEqual(res.dtype, np.int_)
np.testing.assert_array_equal(res.toarray(), expected)

# test missing argument that is np.nan
t5 = tensor(np.array([[np.nan, np.nan, 2], [2, np.nan, -999]]),
chunk_size=2).tosparse(missing=[np.nan, -999])
t5 = (t5 + 1).todense(fill_value=np.nan)
expected = mat.toarray().astype(float)
expected[expected != 0] += 1
expected[expected == 0] = np.nan

res = self.executor.execute_tensor(t5, concat=True)[0]
self.assertEqual(res.dtype, np.float64)
np.testing.assert_array_equal(res, expected)

def testZerosExecution(self):
t = zeros((20, 30), dtype='i8', chunk_size=5)

Expand Down

0 comments on commit 641418d

Please sign in to comment.