Skip to content

Commit

Permalink
Reduce size estimation
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Nov 10, 2021
1 parent e784d41 commit 4822b99
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 26 deletions.
25 changes: 15 additions & 10 deletions mars/core/operand/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,20 +335,22 @@ def post_execute(cls, ctx: Union[dict, Context], op: OperandType):
def estimate_size(cls, ctx: dict, op: OperandType):
from .fetch import FetchShuffle

exec_size = 0
# when sizes of all outputs are deterministic, return directly
outputs = op.outputs
pure_dep_keys = set(
inp.key
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
if is_dep
)
if all(
not c.is_sparse() and hasattr(c, "nbytes") and not np.isnan(c.nbytes)
for c in outputs
):
for out in outputs:
ctx[out.key] = (out.nbytes, out.nbytes)
return

pure_dep_keys = set(
inp.key
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
if is_dep
)
exec_sizes = [0]
for inp in op.inputs or ():
if inp.key in pure_dep_keys:
continue
Expand All @@ -361,13 +363,16 @@ def estimate_size(cls, ctx: dict, op: OperandType):
# execution size of a specific data chunk may be
# larger than stored type due to objects
for key, shape in keys_and_shapes:
exec_size += ctx[key][0]
exec_sizes.append(ctx[key][0])
except KeyError:
if not op.sparse:
inp_size = calc_data_size(inp)
if not np.isnan(inp_size):
exec_size += inp_size
exec_size = int(exec_size)
exec_sizes.append(inp_size)
if any(c.is_sparse() for c in op.inputs):
exec_size = sum(exec_sizes)
else:
exec_size = max(exec_sizes)

total_out_size = 0
chunk_sizes = dict()
Expand Down Expand Up @@ -408,7 +413,7 @@ def estimate_size(cls, ctx: dict, op: OperandType):
max_sparse_size = np.nan
if not np.isnan(max_sparse_size):
result_size = min(result_size, max_sparse_size)
ctx[out.key] = (result_size, exec_size * memory_scale // len(outputs))
ctx[out.key] = (result_size, int(exec_size * memory_scale // len(outputs)))

@classmethod
def concat_tileable_chunks(cls, tileable: TileableType):
Expand Down
16 changes: 11 additions & 5 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,22 @@ async def _collect_input_sizes(
*(storage_api.get_infos.delay(k) for k in fetch_keys)
)

# compute memory quota size. when data located in shared memory, the cost
# should be differences between deserialized memory cost and serialized cost,
# otherwise we should take deserialized memory cost
for key, meta, infos in zip(fetch_keys, fetch_metas, data_infos):
level = functools.reduce(operator.or_, (info.level for info in infos))
if level & StorageLevel.MEMORY:
mem_cost = max(0, meta["memory_size"] - meta["store_size"])
else:
mem_cost = meta["memory_size"]
sizes[key] = (mem_cost, mem_cost)
sizes[key] = (meta["store_size"], mem_cost)

return sizes

@classmethod
def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
size_context = {k: (s, 0) for k, (s, _c) in input_sizes.items()}
size_context = dict(input_sizes.items())
graph = subtask.chunk_graph

key_to_ops = defaultdict(set)
Expand All @@ -243,7 +246,7 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):

visited_op_keys = set()
total_memory_cost = 0
max_memory_cost = 0
max_memory_cost = sum(calc_size for _, calc_size in size_context.values())
while key_stack:
key = key_stack.pop()
op = key_to_ops[key][0]
Expand All @@ -255,15 +258,18 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
total_memory_cost += calc_cost
max_memory_cost = max(total_memory_cost, max_memory_cost)

# when calculation result is stored, memory cost of calculation
# can be replaced with result memory cost
result_cost = sum(size_context[out.key][0] for out in op.outputs)
total_memory_cost += result_cost - calc_cost

visited_op_keys.add(op.key)
visited_op_keys.add(key)

for succ_op_key in op_key_graph.iter_successors(key):
pred_ref_count[succ_op_key] -= 1
if pred_ref_count[succ_op_key] == 0:
key_stack.append(succ_op_key)

for pred_op_key in op_key_graph.iter_predecessors(key):
succ_ref_count[pred_op_key] -= 1
if succ_ref_count[pred_op_key] == 0:
Expand All @@ -272,7 +278,7 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
for out in key_to_ops[pred_op_key][0].outputs
)
total_memory_cost -= pop_result_cost
return sum(t[1] for t in size_context.values()), max_memory_cost
return sum(t[0] for t in size_context.values()), max_memory_cost

@classmethod
def _check_cancelling(cls, subtask_info: SubtaskExecutionInfo):
Expand Down
50 changes: 49 additions & 1 deletion mars/services/scheduling/worker/tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
from typing import Tuple

import numpy as np
import pandas as pd
import pytest

from ..... import oscar as mo
from ..... import remote as mr
from .....core import ChunkGraph, ChunkGraphBuilder, TileableGraph, TileableGraphBuilder
from .....core import (
ChunkGraph,
ChunkGraphBuilder,
TileableGraph,
TileableGraphBuilder,
OutputType,
)
from .....remote.core import RemoteFunction
from .....tensor.fetch import TensorFetch
from .....tensor.arithmetic import TensorTreeAdd
Expand Down Expand Up @@ -384,6 +391,47 @@ def delay_fun(delay, _inp1):
)


def test_estimate_size():
from ..execution import SubtaskExecutionActor
from .....dataframe.arithmetic import DataFrameAdd
from .....dataframe.fetch import DataFrameFetch
from .....dataframe.utils import parse_index

index_value = parse_index(pd.Int64Index([10, 20, 30]))

input1 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
[], _key="INPUT1", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
)
input2 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
[], _key="INPUT2", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
)
result_chunk = DataFrameAdd(
axis=0, output_types=[OutputType.series], lhs=input1, rhs=input2
).new_chunk(
[input1, input2],
_key="ADD_RESULT",
shape=(np.nan,),
dtype=np.dtype("O"),
index_value=index_value,
)

chunk_graph = ChunkGraph([result_chunk])
chunk_graph.add_node(input1)
chunk_graph.add_node(input2)
chunk_graph.add_node(result_chunk)
chunk_graph.add_edge(input1, result_chunk)
chunk_graph.add_edge(input2, result_chunk)

input_sizes = {
"INPUT1": (1024, 1024),
"INPUT2": (1024, 1024),
}

subtask = Subtask("test_subtask", session_id="session_id", chunk_graph=chunk_graph)
result = SubtaskExecutionActor._estimate_sizes(subtask, input_sizes)
assert result[0] == 1024


@pytest.mark.asyncio
@pytest.mark.parametrize("actor_pool", [(1, False)], indirect=True)
async def test_cancel_without_kill(actor_pool):
Expand Down
26 changes: 21 additions & 5 deletions mars/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,16 +489,32 @@ def test_estimate_pandas_size():
df2 = pd.DataFrame(np.random.rand(1000, 10))
assert utils.estimate_pandas_size(df2) == sys.getsizeof(df2)

df3 = pd.DataFrame({
"A": np.random.choice(["abcd", "def", "gh"], size=(1000,)),
"B": np.random.rand(1000),
"C": np.random.rand(1000),
})
df3 = pd.DataFrame(
{
"A": np.random.choice(["abcd", "def", "gh"], size=(1000,)),
"B": np.random.rand(1000),
"C": np.random.rand(1000),
}
)
assert utils.estimate_pandas_size(df3) != sys.getsizeof(df3)

s1 = pd.Series(np.random.rand(1000))
assert utils.estimate_pandas_size(s1) == sys.getsizeof(s1)

from ..dataframe.arrays import ArrowStringArray

array = ArrowStringArray(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
s2 = pd.Series(array)
assert utils.estimate_pandas_size(s2) == sys.getsizeof(s2)

s3 = pd.Series(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
assert utils.estimate_pandas_size(s3) != sys.getsizeof(s3)

idx1 = pd.MultiIndex.from_arrays(
[np.arange(0, 1000), np.random.choice(["abcd", "def", "gh"], size=(1000,))]
)
assert utils.estimate_pandas_size(idx1) != sys.getsizeof(idx1)


@require_ray
def test_web_serialize_lambda():
Expand Down
22 changes: 17 additions & 5 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,24 @@ def _is_fast_dtype(dtype):
else:
return isinstance(dtype, ArrowDtype)

if hasattr(df_obj, "dtype"):
if _is_fast_dtype(df_obj.dtype):
return sys.getsizeof(df_obj)
dtypes = []
if isinstance(df_obj, pd.DataFrame):
dtypes.extend(df_obj.dtypes)
index_obj = df_obj.index
elif isinstance(df_obj, pd.Series):
dtypes.append(df_obj.dtype)
index_obj = df_obj.index
else:
if all(_is_fast_dtype(dtype) for dtype in df_obj.dtypes):
return sys.getsizeof(df_obj)
index_obj = df_obj

# handling possible MultiIndex
if hasattr(index_obj, "dtypes"):
dtypes.extend(index_obj.dtypes)
else:
dtypes.append(index_obj.dtype)

if all(_is_fast_dtype(dtype) for dtype in dtypes):
return sys.getsizeof(df_obj)

indices = np.sort(np.random.choice(len(df_obj), size=max_samples, replace=False))
iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc
Expand Down

0 comments on commit 4822b99

Please sign in to comment.