Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TileDB support as external storage #227

Merged
merged 12 commits into from Feb 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -55,7 +55,7 @@ install:
fi
- pip install -r requirements-dev.txt &&
pip install -r requirements-extra.txt &&
pip install virtualenv coveralls flake8 etcd-gevent
pip install virtualenv coveralls flake8 etcd-gevent tiledb
qinxuye marked this conversation as resolved.
Show resolved Hide resolved
- virtualenv testenv && source testenv/bin/activate
- pip install -r requirements.txt && pip install pytest pytest-timeout
- if [ -z "$DEFAULT_VENV" ]; then deactivate; else source $DEFAULT_VENV/bin/activate; fi
Expand Down
6 changes: 3 additions & 3 deletions mars/lib/sparse/__init__.py
Expand Up @@ -26,11 +26,11 @@
from .coo import COONDArray


def asarray(x):
def asarray(x, shape=None):
from .core import issparse

if issparse(x):
return SparseNDArray(x)
return SparseNDArray(x, shape=shape)

return x

Expand Down Expand Up @@ -460,7 +460,7 @@ def matmul(a, b, sparse=True, **_):
def concatenate(tensors, axis=0):
has_sparse = any(issparse(t) for t in tensors)
if has_sparse:
tensors = [asarray(get_sparse_module(t).csr_matrix(t)) for t in tensors]
tensors = [asarray(get_sparse_module(t).csr_matrix(t), t.shape) for t in tensors]

return reduce(lambda a, b: _call_bin('concatenate', a, b, axis=axis), tensors)

Expand Down
3 changes: 3 additions & 0 deletions mars/lib/sparse/matrix.py
Expand Up @@ -106,6 +106,9 @@ def __init__(self, spmatrix, shape=()):
def ndim(self):
return self.spmatrix.ndim

def tocsr(self):
return self

def toarray(self):
return self.spmatrix.toarray()

Expand Down
24 changes: 23 additions & 1 deletion mars/lib/sparse/vector.py
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

from .array import SparseNDArray
from .core import get_array_module, cp, cps, naked, issparse
from .core import get_array_module, get_sparse_module, cp, cps, naked, issparse


class SparseVector(SparseNDArray):
Expand All @@ -39,6 +39,9 @@ def toarray(self):
def todense(self):
return self.spmatrix.toarray().reshape(self.shape)

def tocsr(self):
return self

def ascupy(self):
is_cp = get_array_module(self.spmatrix) is cp
if is_cp:
Expand Down Expand Up @@ -118,3 +121,22 @@ def dot(self, other, sparse=True):
shape = (other.shape[1],)
return SparseNDArray(x, shape=shape)
return get_array_module(x).asarray(x)

def concatenate(self, other, axis=0):
try:
other = naked(other)
except TypeError:
return NotImplemented

if issparse(other):
xps = get_sparse_module(self.spmatrix)
if axis != 0:
raise ValueError('axis can only be 0')
x = xps.hstack((self.spmatrix, other))
else:
xp = get_array_module(self.spmatrix)
x = xp.concatenate((self.spmatrix.toarray().reshape(self.shape), other), axis=axis)

if issparse(x):
return SparseNDArray(x, shape=(x.shape[1],))
return get_array_module(x).asarray(x)
2 changes: 2 additions & 0 deletions mars/opcodes.py
Expand Up @@ -16,6 +16,8 @@
TENSOR_LINSPACE = 14
TENSOR_TRIU = 15
TENSOR_TRIL = 16
TENSOR_FROM_TILEDB = 18
TENSOR_STORE_TILEDB = 19
DATAFRAME_DATA_SOURCE = 17
RAND_RAND = 41
RAND_RANDN = 42
Expand Down
4 changes: 2 additions & 2 deletions mars/operands/datastore.py
Expand Up @@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .core import Operand
from .base import HasInput


class DataStore(Operand):
class DataStore(HasInput):
__slots__ = ()
3 changes: 3 additions & 0 deletions mars/serialize/protos/operand.proto
Expand Up @@ -26,6 +26,9 @@ message OperandDef {
TENSOR_LINSPACE = 14;
TENSOR_TRIU = 15;
TENSOR_TRIL = 16;
// external storage
TENSOR_FROM_TILEDB = 18;
TENSOR_STORE_TILEDB = 19;
// dataframe
DATAFRAME_DATA_SOURCE = 17;

Expand Down
596 changes: 302 additions & 294 deletions mars/serialize/protos/operand_pb2.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion mars/tensor/__init__.py
Expand Up @@ -18,7 +18,8 @@
from .expressions.datasource import tensor, array, asarray, scalar, \
empty, empty_like, ones, ones_like, zeros, zeros_like, \
full, arange, diag, diagflat, eye, identity, linspace, \
meshgrid, indices, tril, triu
meshgrid, indices, tril, triu, fromtiledb
from .expressions.datastore import totiledb
from .expressions.base import result_type, copyto, transpose, where, broadcast_to, broadcast_arrays, \
expand_dims, rollaxis, swapaxes, moveaxis, ravel, atleast_1d, atleast_2d, atleast_3d, argwhere, \
array_split, split, hsplit, vsplit, dsplit, roll, squeeze, ptp, diff, ediff1d, digitize, \
Expand Down
10 changes: 10 additions & 0 deletions mars/tensor/core.py
Expand Up @@ -173,6 +173,11 @@ def ravel(self):
def _equals(self, o):
return self is o

def totiledb(self, uri, ctx=None, key=None, timestamp=None):
qinxuye marked this conversation as resolved.
Show resolved Hide resolved
from .expressions.datastore import totiledb

return totiledb(uri, self, ctx=ctx, key=key, timestamp=timestamp)

def execute(self, session=None, **kw):
from ..session import Session

Expand Down Expand Up @@ -361,6 +366,11 @@ def reshape(self, shape, *shapes):
"""
return self._data.reshape(shape, *shapes)

def totiledb(self, uri, ctx=None, key=None, timestamp=None):
from .expressions.datastore import totiledb

return totiledb(uri, self, ctx=ctx, key=key, timestamp=timestamp)


class SparseTensor(Tensor):
__slots__ = ()
Expand Down
2 changes: 2 additions & 0 deletions mars/tensor/execution/core.py
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from .datasource import register_data_source_handler
from .datastore import register_data_store_handler
from .random import register_random_handler
from .base import register_basic_handler
from .arithmetic import register_arithmetic_handler
Expand Down Expand Up @@ -48,6 +49,7 @@ def register_tensor_execution_handler():
from .cp import register_cp_handler
register_cp_handler()
register_data_source_handler()
register_data_store_handler()
register_random_handler()
register_basic_handler()
register_arithmetic_handler()
Expand Down
49 changes: 49 additions & 0 deletions mars/tensor/execution/datasource.py
Expand Up @@ -21,6 +21,7 @@
from ...lib.sparse.core import get_sparse_module, get_array_module, cps, sps, naked
from ...lib.sparse import SparseNDArray
from ..expressions import datasource
from .utils import get_tiledb_ctx

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -155,6 +156,52 @@ def _tensor_dense_to_sparse(ctx, chunk):
ctx[chunk.key] = SparseNDArray(xps.csr_matrix(in_data))


def _tensor_tiledb(ctx, chunk):
import tiledb

xp = array_module(chunk.op.gpu)

axis_offsets = [offset + dim_start for offset, dim_start
in zip(chunk.op.axis_offsets, chunk.op.tiledb_dim_starts)]
tiledb_ctx = get_tiledb_ctx(chunk.op.tiledb_config)
uri = chunk.op.tiledb_uri
key = chunk.op.tiledb_key
timestamp = chunk.op.tiledb_timestamp

slcs = []
for axis in range(chunk.ndim):
axis_offset = axis_offsets[axis]
axis_length = chunk.shape[axis]
slcs.append(slice(axis_offset, axis_offset + axis_length))

if not chunk.issparse():
# read dense array from tiledb
with tiledb.DenseArray(tiledb_ctx, uri, key=key, timestamp=timestamp) as tiledb_arr:
ctx[chunk.key] = tiledb_arr[tuple(slcs)]
else:
# read sparse array from tiledb
with tiledb.SparseArray(tiledb_ctx, uri, key=key, timestamp=timestamp) as tiledb_arr:
if tiledb_arr.ndim > 2:
raise NotImplementedError(
'Does not support to read array with more than 2 dimensions')

data = tiledb_arr[tuple(slcs)]
coords = data['coords']
value = data[tiledb_arr.attr(0).name]
if tiledb_arr.ndim == 2:
# 2-d
ij = tuple(coords[tiledb_arr.domain.dim(k).name] - axis_offsets[k]
for k in range(tiledb_arr.ndim))
spmatrix = sps.coo_matrix((value, ij), shape=chunk.shape)
ctx[chunk.key] = SparseNDArray(spmatrix)
else:
# 1-d
ij = xp.zeros(coords.shape), \
coords[tiledb_arr.domain.dim(0).name] - axis_offsets[0]
spmatrix = sps.coo_matrix((value, ij), shape=(1,) + chunk.shape)
ctx[chunk.key] = SparseNDArray(spmatrix, shape=chunk.shape)


def _tensor_fetch_chunk(ctx, chunk):
# nothing need to do
return
Expand Down Expand Up @@ -187,5 +234,7 @@ def register_data_source_handler():
register(datasource.CSRMatrixDataSource, _tensor_csr_matrix_data_source)
register(datasource.SparseToDense, _tensor_sparse_to_dense)
register(datasource.DenseToSparse, _tensor_dense_to_sparse)
register(datasource.TensorTileDBDataSource, _tensor_tiledb)
register(datasource.TensorFetch, _tensor_fetch_chunk)
register(datasource.Scalar, _scalar)

51 changes: 48 additions & 3 deletions mars/tensor/execution/datastore.py
Expand Up @@ -14,10 +14,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
try:
import tiledb
except ImportError: # pragma: no cover
tiledb = None

def _data_store(ctx, chunk):
raise NotImplementedError
from ...lib.sparse import SparseNDArray
from ...lib.sparse.core import sps
from ..expressions import datastore
from .utils import get_tiledb_ctx


def _store_tiledb(ctx, chunk):
tiledb_ctx = get_tiledb_ctx(chunk.op.tiledb_config)
uri = chunk.op.tiledb_uri
key = chunk.op.tiledb_key
timestamp = chunk.op.tiledb_timestamp
axis_offsets = chunk.op.axis_offsets

if not chunk.issparse():
# dense
to_store = np.ascontiguousarray(ctx[chunk.op.input.key])
slcs = []
for axis in range(chunk.ndim):
axis_offset = axis_offsets[axis]
axis_length = chunk.op.input.shape[axis]
slcs.append(slice(axis_offset, axis_offset + axis_length))
with tiledb.DenseArray(tiledb_ctx, uri, mode='w',
key=key, timestamp=timestamp) as arr:
arr[tuple(slcs)] = to_store
ctx[chunk.key] = np.empty((0,) * chunk.ndim, dtype=chunk.dtype)
wjsi marked this conversation as resolved.
Show resolved Hide resolved
else:
# sparse
to_store = ctx[chunk.op.input.key].spmatrix.tocoo()
if to_store.nnz > 0:
with tiledb.SparseArray(tiledb_ctx, uri, mode='w',
key=key, timestamp=timestamp) as arr:
if chunk.ndim == 1:
vec = to_store.col if to_store.shape[0] == 1 else to_store.row
vec += axis_offsets[0]
arr[vec] = to_store.data
else:
i, j = to_store.row + axis_offsets[0], to_store.col + axis_offsets[1]
arr[i, j] = to_store.data
ctx[chunk.key] = SparseNDArray(sps.csr_matrix((0, 0), dtype=chunk.dtype),
shape=chunk.shape)


def register_data_store_handler():
pass
from ...executor import register

register(datastore.TensorTileDBDataStore, _store_tiledb)
81 changes: 80 additions & 1 deletion mars/tensor/execution/tests/test_datasource_execute.py
Expand Up @@ -14,14 +14,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import tempfile
import shutil
import unittest

import numpy as np
import scipy.sparse as sps
try:
import tiledb
except ImportError: # pragma: no cover
tiledb = None

from mars.executor import Executor
from mars.tests.core import TestBase
from mars.tensor.expressions.datasource import tensor, ones_like, zeros, zeros_like, full, \
arange, empty, empty_like, diag, diagflat, eye, linspace, meshgrid, indices, \
triu, tril
triu, tril, fromtiledb
from mars.lib.sparse import SparseNDArray
from mars.tensor.expressions.lib import nd_grid

Expand Down Expand Up @@ -652,3 +660,74 @@ def testIndexTrickExecution(self):
expected = np.lib.index_tricks.nd_grid(sparse=True)[0:5, 0:5]
[np.testing.assert_equal(r, e) for r, e in zip(res, expected)]

@unittest.skipIf(tiledb is None, 'tiledb not installed')
def testReadTileDBExecution(self):
ctx = tiledb.Ctx()

tempdir = tempfile.mkdtemp()
try:
# create TileDB dense array
dom = tiledb.Domain(ctx,
tiledb.Dim(ctx, domain=(1, 100), tile=30, dtype=np.int32),
tiledb.Dim(ctx, domain=(0, 90), tile=22, dtype=np.int32),
tiledb.Dim(ctx, domain=(0, 9), tile=8, dtype=np.int32),
)
schema = tiledb.ArraySchema(ctx, domain=dom, sparse=False,
attrs=[tiledb.Attr(ctx, dtype=np.float64)])
tiledb.DenseArray.create(tempdir, schema)

expected = np.random.rand(100, 91, 10)
with tiledb.DenseArray(ctx, tempdir, mode='w') as arr:
arr.write_direct(expected)

a = fromtiledb(tempdir, ctx=ctx)
result = self.executor.execute_tensor(a, concat=True)[0]

np.testing.assert_allclose(expected, result)
finally:
shutil.rmtree(tempdir)

tempdir = tempfile.mkdtemp()
try:
# create 2-d TileDB sparse array
dom = tiledb.Domain(ctx,
tiledb.Dim(ctx, domain=(0, 99), tile=30, dtype=np.int32),
tiledb.Dim(ctx, domain=(2, 11), tile=8, dtype=np.int32),
)
schema = tiledb.ArraySchema(ctx, domain=dom, sparse=True,
attrs=[tiledb.Attr(ctx, dtype=np.float64)])
tiledb.SparseArray.create(tempdir, schema)

expected = sps.rand(100, 10, density=0.01)
with tiledb.SparseArray(ctx, tempdir, mode='w') as arr:
I, J = expected.row, expected.col + 2
arr[I, J] = {arr.attr(0).name: expected.data}

a = fromtiledb(tempdir, ctx=ctx)
result = self.executor.execute_tensor(a, concat=True)[0]

np.testing.assert_allclose(expected.toarray(), result.toarray())
finally:
shutil.rmtree(tempdir)

tempdir = tempfile.mkdtemp()
try:
# create 1-d TileDB sparse array
dom = tiledb.Domain(ctx,
tiledb.Dim(ctx, domain=(1, 100), tile=30, dtype=np.int32),
)
schema = tiledb.ArraySchema(ctx, domain=dom, sparse=True,
attrs=[tiledb.Attr(ctx, dtype=np.float64)])
tiledb.SparseArray.create(tempdir, schema)

expected = sps.rand(1, 100, density=0.05)
with tiledb.SparseArray(ctx, tempdir, mode='w') as arr:
I= expected.col + 1
arr[I] = expected.data

a = fromtiledb(tempdir, ctx=ctx)
result = self.executor.execute_tensor(a, concat=True)[0]

np.testing.assert_allclose(expected.toarray()[0], result.toarray())
finally:
shutil.rmtree(tempdir)