Skip to content

Commit

Permalink
[BACKPORT] Add TileDB support as external storage (#245)
Browse files Browse the repository at this point in the history
* Add TileDB support as external storage  (#227)

* fix .travis.yml
  • Loading branch information
qinxuye authored and wjsi committed Feb 28, 2019
1 parent fb5cbe8 commit c373075
Show file tree
Hide file tree
Showing 26 changed files with 1,187 additions and 327 deletions.
20 changes: 10 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@ matrix:
services:
- docker
env: PYTHON=2.7 DOCKER_IMAGE=quay.io/pypa/manylinux1_x86_64 PYVER=cp27-cp27mu
- os: osx
language: generic
env: PYTHON=2.7.12
- os: linux
python: "3.5"
services:
- docker
env: PYTHON=3.5 DOCKER_IMAGE=quay.io/pypa/manylinux1_x86_64 PYVER=cp35-cp35m
- os: osx
language: generic
env: PYTHON=3.5.3
- os: linux
python: "3.6"
services:
- docker
env: PYTHON=3.6 DOCKER_IMAGE=quay.io/pypa/manylinux1_x86_64 PYVER=cp36-cp36m
- os: osx
language: generic
env: PYTHON=3.6.1
- os: linux
python: "3.7"
dist: xenial
services:
- docker
env: PYTHON=3.7 DOCKER_IMAGE=quay.io/pypa/manylinux1_x86_64 PYVER=cp37-cp37m
- os: osx
language: generic
env: PYTHON=2.7.12
- os: osx
language: generic
env: PYTHON=3.5.3
- os: osx
language: generic
env: PYTHON=3.6.1
- os: osx
language: generic
env: PYTHON=3.7.0
Expand All @@ -49,7 +49,7 @@ install:
- export DEFAULT_VENV=$VIRTUAL_ENV
- pip install -r requirements-dev.txt
- pip install -r requirements-extra.txt
- pip install virtualenv coveralls flake8 etcd-gevent
- pip install virtualenv coveralls flake8 etcd-gevent tiledb
- virtualenv testenv && source testenv/bin/activate
- pip install -r requirements.txt
- pip install pytest pytest-timeout
Expand Down
6 changes: 3 additions & 3 deletions mars/lib/sparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
from .coo import COONDArray


def asarray(x):
def asarray(x, shape=None):
from .core import issparse

if issparse(x):
return SparseNDArray(x)
return SparseNDArray(x, shape=shape)

return x

Expand Down Expand Up @@ -460,7 +460,7 @@ def matmul(a, b, sparse=True, **_):
def concatenate(tensors, axis=0):
has_sparse = any(issparse(t) for t in tensors)
if has_sparse:
tensors = [asarray(get_sparse_module(t).csr_matrix(t)) for t in tensors]
tensors = [asarray(get_sparse_module(t).csr_matrix(t), t.shape) for t in tensors]

return reduce(lambda a, b: _call_bin('concatenate', a, b, axis=axis), tensors)

Expand Down
3 changes: 3 additions & 0 deletions mars/lib/sparse/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def __init__(self, spmatrix, shape=()):
def ndim(self):
return self.spmatrix.ndim

def tocsr(self):
return self

def toarray(self):
return self.spmatrix.toarray()

Expand Down
24 changes: 23 additions & 1 deletion mars/lib/sparse/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

from .array import SparseNDArray
from .core import get_array_module, cp, cps, naked, issparse
from .core import get_array_module, get_sparse_module, cp, cps, naked, issparse


class SparseVector(SparseNDArray):
Expand All @@ -39,6 +39,9 @@ def toarray(self):
def todense(self):
return self.spmatrix.toarray().reshape(self.shape)

def tocsr(self):
return self

def ascupy(self):
is_cp = get_array_module(self.spmatrix) is cp
if is_cp:
Expand Down Expand Up @@ -118,3 +121,22 @@ def dot(self, other, sparse=True):
shape = (other.shape[1],)
return SparseNDArray(x, shape=shape)
return get_array_module(x).asarray(x)

def concatenate(self, other, axis=0):
try:
other = naked(other)
except TypeError:
return NotImplemented

if issparse(other):
xps = get_sparse_module(self.spmatrix)
if axis != 0:
raise ValueError('axis can only be 0')
x = xps.hstack((self.spmatrix, other))
else:
xp = get_array_module(self.spmatrix)
x = xp.concatenate((self.spmatrix.toarray().reshape(self.shape), other), axis=axis)

if issparse(x):
return SparseNDArray(x, shape=(x.shape[1],))
return get_array_module(x).asarray(x)
2 changes: 2 additions & 0 deletions mars/opcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
TENSOR_LINSPACE = 14
TENSOR_TRIU = 15
TENSOR_TRIL = 16
TENSOR_FROM_TILEDB = 18
TENSOR_STORE_TILEDB = 19
RAND_RAND = 41
RAND_RANDN = 42
RAND_RANDINT = 43
Expand Down
4 changes: 2 additions & 2 deletions mars/operands/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .core import Operand
from .base import HasInput


class DataStore(Operand):
class DataStore(HasInput):
__slots__ = ()
4 changes: 4 additions & 0 deletions mars/serialize/protos/operand.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ message OperandDef {
NULL = 0;

// creation
// tensor
SCALAR = 1;
TENSOR_DATA_SOURCE = 2;
TENSOR_ONES = 3;
Expand All @@ -25,6 +26,9 @@ message OperandDef {
TENSOR_LINSPACE = 14;
TENSOR_TRIU = 15;
TENSOR_TRIL = 16;
// external storage
TENSOR_FROM_TILEDB = 18;
TENSOR_STORE_TILEDB = 19;

// random
RAND_RAND = 41;
Expand Down
592 changes: 294 additions & 298 deletions mars/serialize/protos/operand_pb2.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion mars/tensor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from .expressions.datasource import tensor, array, asarray, scalar, \
empty, empty_like, ones, ones_like, zeros, zeros_like, \
full, arange, diag, diagflat, eye, identity, linspace, \
meshgrid, indices, tril, triu
meshgrid, indices, tril, triu, fromtiledb
from .expressions.datastore import totiledb
from .expressions.base import result_type, copyto, transpose, where, broadcast_to, broadcast_arrays, \
expand_dims, rollaxis, swapaxes, moveaxis, ravel, atleast_1d, atleast_2d, atleast_3d, argwhere, \
array_split, split, hsplit, vsplit, dsplit, roll, squeeze, ptp, diff, ediff1d, digitize, \
Expand Down
5 changes: 5 additions & 0 deletions mars/tensor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ def ravel(self):

flatten = ravel

def totiledb(self, uri, ctx=None, key=None, timestamp=None):
from .expressions.datastore import totiledb

return totiledb(uri, self, ctx=ctx, key=key, timestamp=timestamp)

def _equals(self, o):
return self is o

Expand Down
7 changes: 5 additions & 2 deletions mars/tensor/execution/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def register(op, handler):


from .datasource import register_data_source_handler
from .datastore import register_data_store_handler
from .random import register_random_handler
from .base import register_basic_handler
from .arithmetic import register_arithmetic_handler
Expand All @@ -382,13 +383,14 @@ def register(op, handler):
from .fft import register_fft_handler
from .linalg import register_linalg_handler


NUMEXPR_INSTALLED = False
try:
import numexpr # noqa: F401
NUMEXPR_INSTALLED = True
from .ne import register_numexpr_handler
register_numexpr_handler()
except ImportError:
except ImportError: # pragma: no cover
pass

CP_INSTALLED = False
Expand All @@ -397,10 +399,11 @@ def register(op, handler):
CP_INSTALLED = True
from .cp import register_cp_handler
register_cp_handler()
except ImportError:
except ImportError: # pragma: no cover
pass

register_data_source_handler()
register_data_store_handler()
register_random_handler()
register_basic_handler()
register_arithmetic_handler()
Expand Down
48 changes: 48 additions & 0 deletions mars/tensor/execution/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ...lib.sparse.core import get_sparse_module, get_array_module, cps, sps, naked
from ...lib.sparse import SparseNDArray
from ..expressions import datasource
from .utils import get_tiledb_ctx

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -155,6 +156,52 @@ def _tensor_dense_to_sparse(ctx, chunk):
ctx[chunk.key] = SparseNDArray(xps.csr_matrix(in_data))


def _tensor_tiledb(ctx, chunk):
import tiledb

xp = array_module(chunk.op.gpu)

axis_offsets = [offset + dim_start for offset, dim_start
in zip(chunk.op.axis_offsets, chunk.op.tiledb_dim_starts)]
tiledb_ctx = get_tiledb_ctx(chunk.op.tiledb_config)
uri = chunk.op.tiledb_uri
key = chunk.op.tiledb_key
timestamp = chunk.op.tiledb_timestamp

slcs = []
for axis in range(chunk.ndim):
axis_offset = axis_offsets[axis]
axis_length = chunk.shape[axis]
slcs.append(slice(axis_offset, axis_offset + axis_length))

if not chunk.issparse():
# read dense array from tiledb
with tiledb.DenseArray(tiledb_ctx, uri, key=key, timestamp=timestamp) as tiledb_arr:
ctx[chunk.key] = tiledb_arr[tuple(slcs)]
else:
# read sparse array from tiledb
with tiledb.SparseArray(tiledb_ctx, uri, key=key, timestamp=timestamp) as tiledb_arr:
if tiledb_arr.ndim > 2:
raise NotImplementedError(
'Does not support to read array with more than 2 dimensions')

data = tiledb_arr[tuple(slcs)]
coords = data['coords']
value = data[tiledb_arr.attr(0).name]
if tiledb_arr.ndim == 2:
# 2-d
ij = tuple(coords[tiledb_arr.domain.dim(k).name] - axis_offsets[k]
for k in range(tiledb_arr.ndim))
spmatrix = sps.coo_matrix((value, ij), shape=chunk.shape)
ctx[chunk.key] = SparseNDArray(spmatrix)
else:
# 1-d
ij = xp.zeros(coords.shape), \
coords[tiledb_arr.domain.dim(0).name] - axis_offsets[0]
spmatrix = sps.coo_matrix((value, ij), shape=(1,) + chunk.shape)
ctx[chunk.key] = SparseNDArray(spmatrix, shape=chunk.shape)


def _tensor_fetch_chunk(ctx, chunk):
# nothing need to do
return
Expand Down Expand Up @@ -187,5 +234,6 @@ def register_data_source_handler():
register(datasource.CSRMatrixDataSource, _tensor_csr_matrix_data_source)
register(datasource.SparseToDense, _tensor_sparse_to_dense)
register(datasource.DenseToSparse, _tensor_dense_to_sparse)
register(datasource.TensorTileDBDataSource, _tensor_tiledb)
register(datasource.TensorFetchChunk, _tensor_fetch_chunk)
register(datasource.Scalar, _scalar)
51 changes: 48 additions & 3 deletions mars/tensor/execution/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
try:
import tiledb
except ImportError: # pragma: no cover
tiledb = None

def _data_store(ctx, chunk):
raise NotImplementedError
from ...lib.sparse import SparseNDArray
from ...lib.sparse.core import sps
from ..expressions import datastore
from .utils import get_tiledb_ctx


def _store_tiledb(ctx, chunk):
tiledb_ctx = get_tiledb_ctx(chunk.op.tiledb_config)
uri = chunk.op.tiledb_uri
key = chunk.op.tiledb_key
timestamp = chunk.op.tiledb_timestamp
axis_offsets = chunk.op.axis_offsets

if not chunk.issparse():
# dense
to_store = np.ascontiguousarray(ctx[chunk.op.input.key])
slcs = []
for axis in range(chunk.ndim):
axis_offset = axis_offsets[axis]
axis_length = chunk.op.input.shape[axis]
slcs.append(slice(axis_offset, axis_offset + axis_length))
with tiledb.DenseArray(tiledb_ctx, uri, mode='w',
key=key, timestamp=timestamp) as arr:
arr[tuple(slcs)] = to_store
ctx[chunk.key] = np.empty((0,) * chunk.ndim, dtype=chunk.dtype)
else:
# sparse
to_store = ctx[chunk.op.input.key].spmatrix.tocoo()
if to_store.nnz > 0:
with tiledb.SparseArray(tiledb_ctx, uri, mode='w',
key=key, timestamp=timestamp) as arr:
if chunk.ndim == 1:
vec = to_store.col if to_store.shape[0] == 1 else to_store.row
vec += axis_offsets[0]
arr[vec] = to_store.data
else:
i, j = to_store.row + axis_offsets[0], to_store.col + axis_offsets[1]
arr[i, j] = to_store.data
ctx[chunk.key] = SparseNDArray(sps.csr_matrix((0, 0), dtype=chunk.dtype),
shape=chunk.shape)


def register_data_store_handler():
pass
from .core import register

register(datastore.TensorTileDBDataStore, _store_tiledb)
Loading

0 comments on commit c373075

Please sign in to comment.