Skip to content

Commit

Permalink
Add TileDB support as external storage (#227)
Browse files Browse the repository at this point in the history
* add tiledb relative expressions

* support execute fromtiledb

* support executeo totiledb
  • Loading branch information
qinxuye authored and wjsi committed Feb 23, 2019
1 parent c76ef4b commit 00fc3a0
Show file tree
Hide file tree
Showing 25 changed files with 1,172 additions and 308 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -55,7 +55,7 @@ install:
fi
- pip install -r requirements-dev.txt &&
pip install -r requirements-extra.txt &&
pip install virtualenv coveralls flake8 etcd-gevent
pip install virtualenv coveralls flake8 etcd-gevent tiledb
- virtualenv testenv && source testenv/bin/activate
- pip install -r requirements.txt && pip install pytest pytest-timeout
- if [ -z "$DEFAULT_VENV" ]; then deactivate; else source $DEFAULT_VENV/bin/activate; fi
Expand Down
6 changes: 3 additions & 3 deletions mars/lib/sparse/__init__.py
Expand Up @@ -26,11 +26,11 @@
from .coo import COONDArray


def asarray(x):
def asarray(x, shape=None):
from .core import issparse

if issparse(x):
return SparseNDArray(x)
return SparseNDArray(x, shape=shape)

return x

Expand Down Expand Up @@ -460,7 +460,7 @@ def matmul(a, b, sparse=True, **_):
def concatenate(tensors, axis=0):
has_sparse = any(issparse(t) for t in tensors)
if has_sparse:
tensors = [asarray(get_sparse_module(t).csr_matrix(t)) for t in tensors]
tensors = [asarray(get_sparse_module(t).csr_matrix(t), t.shape) for t in tensors]

return reduce(lambda a, b: _call_bin('concatenate', a, b, axis=axis), tensors)

Expand Down
3 changes: 3 additions & 0 deletions mars/lib/sparse/matrix.py
Expand Up @@ -106,6 +106,9 @@ def __init__(self, spmatrix, shape=()):
def ndim(self):
return self.spmatrix.ndim

def tocsr(self):
return self

def toarray(self):
return self.spmatrix.toarray()

Expand Down
24 changes: 23 additions & 1 deletion mars/lib/sparse/vector.py
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

from .array import SparseNDArray
from .core import get_array_module, cp, cps, naked, issparse
from .core import get_array_module, get_sparse_module, cp, cps, naked, issparse


class SparseVector(SparseNDArray):
Expand All @@ -39,6 +39,9 @@ def toarray(self):
def todense(self):
return self.spmatrix.toarray().reshape(self.shape)

def tocsr(self):
return self

def ascupy(self):
is_cp = get_array_module(self.spmatrix) is cp
if is_cp:
Expand Down Expand Up @@ -118,3 +121,22 @@ def dot(self, other, sparse=True):
shape = (other.shape[1],)
return SparseNDArray(x, shape=shape)
return get_array_module(x).asarray(x)

def concatenate(self, other, axis=0):
try:
other = naked(other)
except TypeError:
return NotImplemented

if issparse(other):
xps = get_sparse_module(self.spmatrix)
if axis != 0:
raise ValueError('axis can only be 0')
x = xps.hstack((self.spmatrix, other))
else:
xp = get_array_module(self.spmatrix)
x = xp.concatenate((self.spmatrix.toarray().reshape(self.shape), other), axis=axis)

if issparse(x):
return SparseNDArray(x, shape=(x.shape[1],))
return get_array_module(x).asarray(x)
2 changes: 2 additions & 0 deletions mars/opcodes.py
Expand Up @@ -16,6 +16,8 @@
TENSOR_LINSPACE = 14
TENSOR_TRIU = 15
TENSOR_TRIL = 16
TENSOR_FROM_TILEDB = 18
TENSOR_STORE_TILEDB = 19
DATAFRAME_DATA_SOURCE = 17
RAND_RAND = 41
RAND_RANDN = 42
Expand Down
4 changes: 2 additions & 2 deletions mars/operands/datastore.py
Expand Up @@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .core import Operand
from .base import HasInput


class DataStore(Operand):
class DataStore(HasInput):
__slots__ = ()
3 changes: 3 additions & 0 deletions mars/serialize/protos/operand.proto
Expand Up @@ -26,6 +26,9 @@ message OperandDef {
TENSOR_LINSPACE = 14;
TENSOR_TRIU = 15;
TENSOR_TRIL = 16;
// external storage
TENSOR_FROM_TILEDB = 18;
TENSOR_STORE_TILEDB = 19;
// dataframe
DATAFRAME_DATA_SOURCE = 17;

Expand Down
596 changes: 302 additions & 294 deletions mars/serialize/protos/operand_pb2.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion mars/tensor/__init__.py
Expand Up @@ -18,7 +18,8 @@
from .expressions.datasource import tensor, array, asarray, scalar, \
empty, empty_like, ones, ones_like, zeros, zeros_like, \
full, arange, diag, diagflat, eye, identity, linspace, \
meshgrid, indices, tril, triu
meshgrid, indices, tril, triu, fromtiledb
from .expressions.datastore import totiledb
from .expressions.base import result_type, copyto, transpose, where, broadcast_to, broadcast_arrays, \
expand_dims, rollaxis, swapaxes, moveaxis, ravel, atleast_1d, atleast_2d, atleast_3d, argwhere, \
array_split, split, hsplit, vsplit, dsplit, roll, squeeze, ptp, diff, ediff1d, digitize, \
Expand Down
10 changes: 10 additions & 0 deletions mars/tensor/core.py
Expand Up @@ -173,6 +173,11 @@ def ravel(self):
def _equals(self, o):
return self is o

def totiledb(self, uri, ctx=None, key=None, timestamp=None):
from .expressions.datastore import totiledb

return totiledb(uri, self, ctx=ctx, key=key, timestamp=timestamp)

def execute(self, session=None, **kw):
from ..session import Session

Expand Down Expand Up @@ -361,6 +366,11 @@ def reshape(self, shape, *shapes):
"""
return self._data.reshape(shape, *shapes)

def totiledb(self, uri, ctx=None, key=None, timestamp=None):
from .expressions.datastore import totiledb

return totiledb(uri, self, ctx=ctx, key=key, timestamp=timestamp)


class SparseTensor(Tensor):
__slots__ = ()
Expand Down
2 changes: 2 additions & 0 deletions mars/tensor/execution/core.py
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from .datasource import register_data_source_handler
from .datastore import register_data_store_handler
from .random import register_random_handler
from .base import register_basic_handler
from .arithmetic import register_arithmetic_handler
Expand Down Expand Up @@ -48,6 +49,7 @@ def register_tensor_execution_handler():
from .cp import register_cp_handler
register_cp_handler()
register_data_source_handler()
register_data_store_handler()
register_random_handler()
register_basic_handler()
register_arithmetic_handler()
Expand Down
49 changes: 49 additions & 0 deletions mars/tensor/execution/datasource.py
Expand Up @@ -21,6 +21,7 @@
from ...lib.sparse.core import get_sparse_module, get_array_module, cps, sps, naked
from ...lib.sparse import SparseNDArray
from ..expressions import datasource
from .utils import get_tiledb_ctx

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -155,6 +156,52 @@ def _tensor_dense_to_sparse(ctx, chunk):
ctx[chunk.key] = SparseNDArray(xps.csr_matrix(in_data))


def _tensor_tiledb(ctx, chunk):
import tiledb

xp = array_module(chunk.op.gpu)

axis_offsets = [offset + dim_start for offset, dim_start
in zip(chunk.op.axis_offsets, chunk.op.tiledb_dim_starts)]
tiledb_ctx = get_tiledb_ctx(chunk.op.tiledb_config)
uri = chunk.op.tiledb_uri
key = chunk.op.tiledb_key
timestamp = chunk.op.tiledb_timestamp

slcs = []
for axis in range(chunk.ndim):
axis_offset = axis_offsets[axis]
axis_length = chunk.shape[axis]
slcs.append(slice(axis_offset, axis_offset + axis_length))

if not chunk.issparse():
# read dense array from tiledb
with tiledb.DenseArray(tiledb_ctx, uri, key=key, timestamp=timestamp) as tiledb_arr:
ctx[chunk.key] = tiledb_arr[tuple(slcs)]
else:
# read sparse array from tiledb
with tiledb.SparseArray(tiledb_ctx, uri, key=key, timestamp=timestamp) as tiledb_arr:
if tiledb_arr.ndim > 2:
raise NotImplementedError(
'Does not support to read array with more than 2 dimensions')

data = tiledb_arr[tuple(slcs)]
coords = data['coords']
value = data[tiledb_arr.attr(0).name]
if tiledb_arr.ndim == 2:
# 2-d
ij = tuple(coords[tiledb_arr.domain.dim(k).name] - axis_offsets[k]
for k in range(tiledb_arr.ndim))
spmatrix = sps.coo_matrix((value, ij), shape=chunk.shape)
ctx[chunk.key] = SparseNDArray(spmatrix)
else:
# 1-d
ij = xp.zeros(coords.shape), \
coords[tiledb_arr.domain.dim(0).name] - axis_offsets[0]
spmatrix = sps.coo_matrix((value, ij), shape=(1,) + chunk.shape)
ctx[chunk.key] = SparseNDArray(spmatrix, shape=chunk.shape)


def _tensor_fetch_chunk(ctx, chunk):
# nothing need to do
return
Expand Down Expand Up @@ -187,5 +234,7 @@ def register_data_source_handler():
register(datasource.CSRMatrixDataSource, _tensor_csr_matrix_data_source)
register(datasource.SparseToDense, _tensor_sparse_to_dense)
register(datasource.DenseToSparse, _tensor_dense_to_sparse)
register(datasource.TensorTileDBDataSource, _tensor_tiledb)
register(datasource.TensorFetch, _tensor_fetch_chunk)
register(datasource.Scalar, _scalar)

51 changes: 48 additions & 3 deletions mars/tensor/execution/datastore.py
Expand Up @@ -14,10 +14,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
try:
import tiledb
except ImportError: # pragma: no cover
tiledb = None

def _data_store(ctx, chunk):
raise NotImplementedError
from ...lib.sparse import SparseNDArray
from ...lib.sparse.core import sps
from ..expressions import datastore
from .utils import get_tiledb_ctx


def _store_tiledb(ctx, chunk):
tiledb_ctx = get_tiledb_ctx(chunk.op.tiledb_config)
uri = chunk.op.tiledb_uri
key = chunk.op.tiledb_key
timestamp = chunk.op.tiledb_timestamp
axis_offsets = chunk.op.axis_offsets

if not chunk.issparse():
# dense
to_store = np.ascontiguousarray(ctx[chunk.op.input.key])
slcs = []
for axis in range(chunk.ndim):
axis_offset = axis_offsets[axis]
axis_length = chunk.op.input.shape[axis]
slcs.append(slice(axis_offset, axis_offset + axis_length))
with tiledb.DenseArray(tiledb_ctx, uri, mode='w',
key=key, timestamp=timestamp) as arr:
arr[tuple(slcs)] = to_store
ctx[chunk.key] = np.empty((0,) * chunk.ndim, dtype=chunk.dtype)
else:
# sparse
to_store = ctx[chunk.op.input.key].spmatrix.tocoo()
if to_store.nnz > 0:
with tiledb.SparseArray(tiledb_ctx, uri, mode='w',
key=key, timestamp=timestamp) as arr:
if chunk.ndim == 1:
vec = to_store.col if to_store.shape[0] == 1 else to_store.row
vec += axis_offsets[0]
arr[vec] = to_store.data
else:
i, j = to_store.row + axis_offsets[0], to_store.col + axis_offsets[1]
arr[i, j] = to_store.data
ctx[chunk.key] = SparseNDArray(sps.csr_matrix((0, 0), dtype=chunk.dtype),
shape=chunk.shape)


def register_data_store_handler():
pass
from ...executor import register

register(datastore.TensorTileDBDataStore, _store_tiledb)
81 changes: 80 additions & 1 deletion mars/tensor/execution/tests/test_datasource_execute.py
Expand Up @@ -14,14 +14,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import tempfile
import shutil
import unittest

import numpy as np
import scipy.sparse as sps
try:
import tiledb
except ImportError: # pragma: no cover
tiledb = None

from mars.executor import Executor
from mars.tests.core import TestBase
from mars.tensor.expressions.datasource import tensor, ones_like, zeros, zeros_like, full, \
arange, empty, empty_like, diag, diagflat, eye, linspace, meshgrid, indices, \
triu, tril
triu, tril, fromtiledb
from mars.lib.sparse import SparseNDArray
from mars.tensor.expressions.lib import nd_grid

Expand Down Expand Up @@ -652,3 +660,74 @@ def testIndexTrickExecution(self):
expected = np.lib.index_tricks.nd_grid(sparse=True)[0:5, 0:5]
[np.testing.assert_equal(r, e) for r, e in zip(res, expected)]

@unittest.skipIf(tiledb is None, 'tiledb not installed')
def testReadTileDBExecution(self):
ctx = tiledb.Ctx()

tempdir = tempfile.mkdtemp()
try:
# create TileDB dense array
dom = tiledb.Domain(ctx,
tiledb.Dim(ctx, domain=(1, 100), tile=30, dtype=np.int32),
tiledb.Dim(ctx, domain=(0, 90), tile=22, dtype=np.int32),
tiledb.Dim(ctx, domain=(0, 9), tile=8, dtype=np.int32),
)
schema = tiledb.ArraySchema(ctx, domain=dom, sparse=False,
attrs=[tiledb.Attr(ctx, dtype=np.float64)])
tiledb.DenseArray.create(tempdir, schema)

expected = np.random.rand(100, 91, 10)
with tiledb.DenseArray(ctx, tempdir, mode='w') as arr:
arr.write_direct(expected)

a = fromtiledb(tempdir, ctx=ctx)
result = self.executor.execute_tensor(a, concat=True)[0]

np.testing.assert_allclose(expected, result)
finally:
shutil.rmtree(tempdir)

tempdir = tempfile.mkdtemp()
try:
# create 2-d TileDB sparse array
dom = tiledb.Domain(ctx,
tiledb.Dim(ctx, domain=(0, 99), tile=30, dtype=np.int32),
tiledb.Dim(ctx, domain=(2, 11), tile=8, dtype=np.int32),
)
schema = tiledb.ArraySchema(ctx, domain=dom, sparse=True,
attrs=[tiledb.Attr(ctx, dtype=np.float64)])
tiledb.SparseArray.create(tempdir, schema)

expected = sps.rand(100, 10, density=0.01)
with tiledb.SparseArray(ctx, tempdir, mode='w') as arr:
I, J = expected.row, expected.col + 2
arr[I, J] = {arr.attr(0).name: expected.data}

a = fromtiledb(tempdir, ctx=ctx)
result = self.executor.execute_tensor(a, concat=True)[0]

np.testing.assert_allclose(expected.toarray(), result.toarray())
finally:
shutil.rmtree(tempdir)

tempdir = tempfile.mkdtemp()
try:
# create 1-d TileDB sparse array
dom = tiledb.Domain(ctx,
tiledb.Dim(ctx, domain=(1, 100), tile=30, dtype=np.int32),
)
schema = tiledb.ArraySchema(ctx, domain=dom, sparse=True,
attrs=[tiledb.Attr(ctx, dtype=np.float64)])
tiledb.SparseArray.create(tempdir, schema)

expected = sps.rand(1, 100, density=0.05)
with tiledb.SparseArray(ctx, tempdir, mode='w') as arr:
I= expected.col + 1
arr[I] = expected.data

a = fromtiledb(tempdir, ctx=ctx)
result = self.executor.execute_tensor(a, concat=True)[0]

np.testing.assert_allclose(expected.toarray()[0], result.toarray())
finally:
shutil.rmtree(tempdir)

0 comments on commit 00fc3a0

Please sign in to comment.