Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support missing argument for tensor.tosparse() and fill_value argument for sparse_tensor.todense() #1797

Merged
merged 1 commit into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions mars/learn/contrib/lightgbm/_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def tile(cls, op: "LGBMPredict"):
@classmethod
def execute(cls, ctx, op: "LGBMPredict"):
in_data = ctx[op.data.key]
in_data = in_data.spmatrix if hasattr(in_data, 'spmatrix') else in_data
out = op.outputs[0]

if op.data.shape[0] == 0:
Expand Down
6 changes: 5 additions & 1 deletion mars/learn/contrib/lightgbm/_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ def execute(cls, ctx, op: "LGBMTrain"):
from lightgbm.basic import _safe_call, _LIB

data_val = ctx[op.data.key]
data_val = data_val.spmatrix if hasattr(data_val, 'spmatrix') else data_val

label_val = ctx[op.label.key]
sample_weight_val = ctx[op.sample_weight.key] if op.sample_weight is not None else None
init_score_val = ctx[op.init_score.key] if op.init_score is not None else None
Expand All @@ -266,7 +268,9 @@ def execute(cls, ctx, op: "LGBMTrain"):
else:
eval_set, eval_sample_weight, eval_init_score = [], [], []
for data, label in zip(op.eval_datas, op.eval_labels):
eval_set.append((ctx[data.key], ctx[label.key]))
data_eval = ctx[data.key]
data_eval = data_eval.spmatrix if hasattr(data_eval, 'spmatrix') else data_eval
eval_set.append((data_eval, ctx[label.key]))
for weight in op.eval_sample_weights:
eval_sample_weight.append(ctx[weight.key] if weight is not None else None)
for score in op.eval_init_scores:
Expand Down
16 changes: 16 additions & 0 deletions mars/learn/contrib/lightgbm/tests/test_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import unittest

import numpy as np

import mars.tensor as mt
import mars.dataframe as md
from mars.session import new_session
Expand All @@ -37,6 +39,9 @@ def setUp(self):
self.X = rs.rand(n_rows, n_columns, chunk_size=chunk_size)
self.y = rs.rand(n_rows, chunk_size=chunk_size)
self.X_df = md.DataFrame(self.X)
x_sparse = np.random.rand(n_rows, n_columns)
x_sparse[np.arange(n_rows), np.random.randint(n_columns, size=n_rows)] = np.nan
self.X_sparse = mt.tensor(x_sparse, chunk_size=chunk_size).tosparse(missing=np.nan)

self.session = new_session().as_default()
self._old_executor = self.session._sess._executor
Expand All @@ -58,6 +63,17 @@ def testLocalClassifier(self):

self.assertIsInstance(prediction, mt.Tensor)

# test sparse tensor
X_sparse = self.X_sparse
classifier = LGBMClassifier(n_estimators=2)
classifier.fit(X_sparse, y, eval_set=[(X_sparse, y)], verbose=True)
prediction = classifier.predict(X_sparse)

self.assertEqual(prediction.ndim, 1)
self.assertEqual(prediction.shape[0], len(self.X))

self.assertIsInstance(prediction, mt.Tensor)

prob = classifier.predict_proba(X)
self.assertEqual(prob.shape, X.shape)

Expand Down
1 change: 1 addition & 0 deletions mars/learn/contrib/xgboost/dmatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def get_xgb_dmatrix(tup):
from xgboost import DMatrix

data, label, weight, missing, feature_names, feature_types = tup
data = data.spmatrix if hasattr(data, 'spmatrix') else data
return DMatrix(data, label=label, missing=missing, weight=weight,
feature_names=feature_names, feature_types=feature_types,
nthread=-1)
Expand Down
1 change: 1 addition & 0 deletions mars/learn/contrib/xgboost/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def execute(cls, ctx, op):
if isinstance(data, tuple):
data = ToDMatrix.get_xgb_dmatrix(data)
else:
data = data.spmatrix if hasattr(data, 'spmatrix') else data
data = DMatrix(data)
result = op.model.predict(data)

Expand Down
6 changes: 6 additions & 0 deletions mars/learn/contrib/xgboost/tests/test_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def setUp(self):
self.y = rs.rand(n_rows, chunk_size=chunk_size)
self.X_df = md.DataFrame(self.X)
self.y_series = md.Series(self.y)
x_sparse = np.random.rand(n_rows, n_columns)
x_sparse[np.arange(n_rows), np.random.randint(n_columns, size=n_rows)] = np.nan
self.X_sparse = mt.tensor(x_sparse, chunk_size=chunk_size).tosparse(missing=np.nan)

self.session = new_session().as_default()
self._old_executor = self.session._sess._executor
Expand All @@ -58,6 +61,9 @@ def testLocalPredictTensor(self):
prediction = predict(booster, self.X)
self.assertIsInstance(prediction.to_numpy(), np.ndarray)

prediction = predict(booster, self.X_sparse)
self.assertIsInstance(prediction.to_numpy(), np.ndarray)

prediction = predict(booster, dtrain)
self.assertIsInstance(prediction.fetch(), np.ndarray)

Expand Down
10 changes: 10 additions & 0 deletions mars/learn/contrib/xgboost/tests/test_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import unittest

import numpy as np

import mars.tensor as mt
import mars.dataframe as md
from mars.session import new_session
Expand Down Expand Up @@ -45,6 +47,9 @@ def setUp(self):
self.X_df = md.DataFrame(self.X)
self.y_series = md.Series(self.y)
self.weight = rs.rand(n_rows, chunk_size=chunk_size)
x_sparse = np.random.rand(n_rows, n_columns)
x_sparse[np.arange(n_rows), np.random.randint(n_columns, size=n_rows)] = np.nan
self.X_sparse = mt.tensor(x_sparse, chunk_size=chunk_size).tosparse(missing=np.nan)

self.session = new_session().as_default()
self._old_executor = self.session._sess._executor
Expand Down Expand Up @@ -81,6 +86,11 @@ def testLocalTrainTensor(self):
booster = train({}, dtrain, num_boost_round=2)
self.assertIsInstance(booster, Booster)

def testLocalTrainSparseTensor(self):
dtrain = MarsDMatrix(self.X_sparse, self.y)
booster = train({}, dtrain, num_boost_round=2)
self.assertIsInstance(booster, Booster)

def testLocalTrainDataFrame(self):
dtrain = MarsDMatrix(self.X_df, self.y_series)
booster = train({}, dtrain, num_boost_round=2)
Expand Down
8 changes: 4 additions & 4 deletions mars/tensor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,19 @@ def is_scalar(self):

isscalar = is_scalar

def tosparse(self):
def tosparse(self, missing=None):
if self.issparse():
return self

from .datasource import fromdense
return fromdense(self)
return fromdense(self, missing=missing)

def todense(self):
def todense(self, fill_value=None):
if not self.issparse():
return self

from .datasource import fromsparse
return fromsparse(self)
return fromsparse(self, fill_value=fill_value)

def transpose(self, *axes):
from .base import transpose
Expand Down
48 changes: 41 additions & 7 deletions mars/tensor/datasource/from_dense.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import Iterable
from functools import reduce
from operator import and_

import numpy as np
import pandas as pd

from ... import opcodes as OperandDef
from ...serialize import KeyField
from ...serialize import KeyField, AnyField
from ...lib.sparse import SparseNDArray
from ...lib.sparse.core import naked, cps, sps
from .core import TensorHasInput
Expand All @@ -25,22 +32,49 @@
class DenseToSparse(TensorHasInput):
_op_type_ = OperandDef.DENSE_TO_SPARSE

_input = KeyField('_input')
_input = KeyField('input')
_missing = AnyField('missing')

def __init__(self, dtype=None, gpu=None, missing=None, **kw):
super().__init__(_dtype=dtype, _gpu=gpu, _sparse=True,
_missing=missing, **kw)

def __init__(self, dtype=None, gpu=None, **kw):
super().__init__(_dtype=dtype, _gpu=gpu, _sparse=True, **kw)
@property
def missing(self):
return self._missing

@staticmethod
def _get_mask(data, missing):
if isinstance(missing, Iterable):
return reduce(and_, (DenseToSparse._get_mask(data, m) for m in missing))
elif pd.isna(missing):
return ~pd.isna(data)
else:
return data != missing

@classmethod
def execute(cls, ctx, op):
out = op.outputs[0]
in_data = naked(ctx[op.inputs[0].key])
missing = op.missing
shape = in_data.shape \
if any(np.isnan(s) for s in out.shape) else out.shape

xps = cps if op.gpu else sps
ctx[op.outputs[0].key] = SparseNDArray(xps.csr_matrix(in_data), shape=op.outputs[0].shape)
if missing is None:
ctx[out.key] = \
SparseNDArray(xps.csr_matrix(in_data), shape=shape)
else:
mask = cls._get_mask(in_data, missing)
spmatrix = xps.csr_matrix((in_data[mask], mask.nonzero()),
shape=shape)
ctx[out.key] = SparseNDArray(spmatrix)


def fromdense(a):
def fromdense(a, missing=None):
a = tensor(a)
if a.issparse():
return a

op = DenseToSparse(dtype=a.dtype, gpu=a.op.gpu)
op = DenseToSparse(dtype=a.dtype, gpu=a.op.gpu, missing=missing)
return op(a)
42 changes: 32 additions & 10 deletions mars/tensor/datasource/from_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# limitations under the License.

from ... import opcodes as OperandDef
from ...serialize import KeyField, StringField
from ...serialize import KeyField, StringField, AnyField
from ..array_utils import as_same_device, device, get_array_module
from ..utils import get_order
from .core import TensorHasInput
from .array import tensor
Expand All @@ -24,29 +25,50 @@
class SparseToDense(TensorHasInput):
_op_type_ = OperandDef.SPARSE_TO_DENSE

_input = KeyField('_input')
_order = StringField('_order')
_input = KeyField('input')
_order = StringField('order')
_fill_value = AnyField('fill_value')

def __init__(self, dtype=None, gpu=None, order=None, **kw):
super().__init__(_dtype=dtype, _gpu=gpu, _sparse=False, _order=order, **kw)
def __init__(self, dtype=None, fill_value=None, gpu=None, order=None, **kw):
super().__init__(_dtype=dtype, _fill_value=fill_value,
_gpu=gpu, _sparse=False, _order=order, **kw)

@property
def order(self):
return self._order

@property
def fill_value(self):
return self._fill_value

@classmethod
def execute(cls, ctx, op):
ctx[op.outputs[0].key] = \
ctx[op.inputs[0].key].toarray().astype(
op.outputs[0].dtype, order=op.order, copy=False)
fill_value = op.fill_value
out = op.outputs[0]
(inp,), device_id, xp = as_same_device(
[ctx[inp.key] for inp in op.inputs], device=op.device, ret_extra=True)

with device(device_id):
if fill_value is None:
ctx[out.key] = inp.toarray().astype(
out.dtype, order=op.order, copy=False)
else:
xp = get_array_module(xp)
spmatrix = inp.spmatrix
inds = spmatrix.nonzero()
ret = xp.full(inp.shape, fill_value, dtype=out.dtype,
order=op.order)
ret[inds] = spmatrix.data
ctx[out.key] = ret


def fromsparse(a, order='C'):
def fromsparse(a, order='C', fill_value=None):
a = tensor(a)
if not a.issparse():
return a.astype(a.dtype, order=order, copy=False)

tensor_order = get_order(order, None, available_options='CF',
err_msg="only 'C' or 'F' order is permitted")
op = SparseToDense(dtype=a.dtype, gpu=a.op.gpu, order=order)
op = SparseToDense(dtype=a.dtype, gpu=a.op.gpu,
order=order, fill_value=fill_value)
return op(a, order=tensor_order)
25 changes: 25 additions & 0 deletions mars/tensor/datasource/tests/test_datasource_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ def testCreateSparseExecution(self):
np.testing.assert_array_equal(res[0].toarray(), mat[..., :2].toarray())
np.testing.assert_array_equal(res[1].toarray(), mat[..., 2:].toarray())

# test missing argument
t4 = tensor(np.array([[0, 0, 2], [2, 0, 0]]), chunk_size=2).tosparse(missing=2)
t4 = t4 + 1
expected = mat.toarray()
raw = expected.copy()
expected[raw == 0] += 1
expected[raw != 0] = 0

res = self.executor.execute_tensor(t4, concat=True)[0]
self.assertIsInstance(res, SparseNDArray)
self.assertEqual(res.dtype, np.int_)
np.testing.assert_array_equal(res.toarray(), expected)

# test missing argument that is np.nan
t5 = tensor(np.array([[np.nan, np.nan, 2], [2, np.nan, -999]]),
chunk_size=2).tosparse(missing=[np.nan, -999])
t5 = (t5 + 1).todense(fill_value=np.nan)
expected = mat.toarray().astype(float)
expected[expected != 0] += 1
expected[expected == 0] = np.nan

res = self.executor.execute_tensor(t5, concat=True)[0]
self.assertEqual(res.dtype, np.float64)
np.testing.assert_array_equal(res, expected)

def testZerosExecution(self):
t = zeros((20, 30), dtype='i8', chunk_size=5)

Expand Down