Skip to content

Commit

Permalink
Vineyard: from unordered chunks. (#3324)
Browse files Browse the repository at this point in the history
* Vineyard: from unordered chunks.

Signed-off-by: Tao He <sighingnow@gmail.com>

* Skip coverage check for those guard code.

Signed-off-by: Tao He <sighingnow@gmail.com>

Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed Jan 17, 2023
1 parent 181fe51 commit 9f0bf34
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 16 deletions.
40 changes: 31 additions & 9 deletions mars/dataframe/datasource/from_vineyard.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def execute(cls, ctx, op):
dtypes.append(dtype)
dtypes = pd.Series(dtypes, index=columns)
chunk_index = (
chunk_meta["partition_index_row_"],
chunk_meta["partition_index_column_"],
chunk_meta.get("partition_index_row_", -1),
chunk_meta.get("partition_index_column_", -1),
)
# chunk: (chunk_id, worker_address, dtype, shape, index, columns)
chunks.append(
Expand Down Expand Up @@ -173,7 +173,13 @@ def __init__(self, vineyard_socket=None, object_id=None, **kw):
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw)

def __call__(self, meta):
return self.new_dataframe([meta])
return self.new_dataframe(
[meta],
shape=meta.shape,
dtypes=meta.dtypes,
index_value=meta.index_value,
columns_value=meta.columns_value,
)

@classmethod
def tile(cls, op):
Expand All @@ -182,21 +188,37 @@ def tile(cls, op):

ctx = get_context()

in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
out_chunks = []
chunk_map = dict()
dtypes, columns = None, None
for chunk, infos in zip(
op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys)
):

in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
in_chunk_results = ctx.get_chunks_result(in_chunk_keys)

# check if chunk indexes has unknown value
has_unknown_chunk_index = False
for infos in in_chunk_results:
for _, info in infos.iterrows(): # pragma: no cover
if len(info["index"]) == 0 or -1 in info["index"]:
has_unknown_chunk_index = True
break

# assume chunks are row-splitted if chunk index is unknown
chunk_location = 0

for chunk, infos in zip(op.inputs[0].chunks, in_chunk_results):
for _, info in infos.iterrows():
chunk_op = op.copy().reset_key()
chunk_op.object_id = info["id"]
chunk_op.expect_worker = info["worker_address"]
dtypes = info["dtypes"]
columns = info["columns"]
shape = info["shape"]
chunk_index = info["index"]
if has_unknown_chunk_index: # pragma: no cover
chunk_index = (chunk_location, 0)
chunk_location += 1
else:
chunk_index = info["index"]
chunk_map[chunk_index] = info["shape"]
out_chunk = chunk_op.new_chunk(
[chunk],
Expand Down Expand Up @@ -251,7 +273,7 @@ def from_vineyard(df, vineyard_socket=None):
gpu=None,
)
meta = metaop(
shape=(np.nan,),
shape=(np.nan, np.nan),
dtypes=pd.Series([]),
index_value=parse_index(pd.Index([])),
columns_value=parse_index(pd.Index([])),
Expand Down
30 changes: 23 additions & 7 deletions mars/tensor/datasource/from_vineyard.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def execute(cls, ctx, op):
chunk_meta["value_type_"], chunk_meta.get("value_type_meta_", None)
)
shape = tuple(json.loads(chunk_meta["shape_"]))
chunk_index = tuple(json.loads(chunk_meta["partition_index_"]))
chunk_index = tuple(json.loads(chunk_meta.get("partition_index_", "[]")))
# chunk: (chunk_id, worker_address, dtype, shape, index)
chunks.append(
(repr(chunk_meta.id), ctx.worker_address, dtype, shape, chunk_index)
Expand All @@ -119,7 +119,7 @@ def __init__(self, vineyard_socket=None, object_id=None, **kw):
super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw)

def __call__(self, meta):
return self.new_tensor([meta], shape=(np.nan,))
return self.new_tensor([meta], shape=meta.shape, dtype=meta.dtype)

@classmethod
def tile(cls, op):
Expand All @@ -128,20 +128,35 @@ def tile(cls, op):

ctx = get_context()

in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
out_chunks = []
chunk_map = dict()
dtype = None
for chunk, infos in zip(
op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys)
):
in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks]
in_chunk_results = ctx.get_chunks_result(in_chunk_keys)

# check if chunk indexes has unknown value
has_unknown_chunk_index = False
for infos in in_chunk_results:
for info in infos[0]: # pragma: no cover
if len(info[4]) == 0 or -1 in info[4]:
has_unknown_chunk_index = True
break

# assume chunks are row-splitted if chunk index is unknown
chunk_location = 0

for chunk, infos in zip(op.inputs[0].chunks, in_chunk_results):
for info in infos[0]: # n.b. 1-element ndarray
chunk_op = op.copy().reset_key()
chunk_op.object_id = info[0]
chunk_op.expect_worker = info[1]
dtype = info[2]
shape = info[3]
chunk_index = info[4]
if has_unknown_chunk_index: # pragma: no cover
chunk_index = (chunk_location,)
chunk_location += 1
else:
chunk_index = info[4]
chunk_map[chunk_index] = info[3]
out_chunk = chunk_op.new_chunk(
[chunk], shape=shape, dtype=dtype, index=chunk_index
Expand Down Expand Up @@ -181,6 +196,7 @@ def fromvineyard(tensor, vineyard_socket=None):
metaop = TensorFromVineyard(
vineyard_socket=vineyard_socket,
object_id=object_id,
shape=(np.nan,),
dtype=np.dtype("byte"),
gpu=None,
)
Expand Down

0 comments on commit 9f0bf34

Please sign in to comment.