Skip to content

Commit

Permalink
Reduce estimation time cost (mars-project#2577)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Dec 7, 2021
1 parent d53df46 commit 739b0b2
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 24 deletions.
25 changes: 15 additions & 10 deletions mars/core/operand/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,20 +335,22 @@ def post_execute(cls, ctx: Union[dict, Context], op: OperandType):
def estimate_size(cls, ctx: dict, op: OperandType):
from .fetch import FetchShuffle

exec_size = 0
# when sizes of all outputs are deterministic, return directly
outputs = op.outputs
pure_dep_keys = set(
inp.key
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
if is_dep
)
if all(
not c.is_sparse() and hasattr(c, "nbytes") and not np.isnan(c.nbytes)
for c in outputs
):
for out in outputs:
ctx[out.key] = (out.nbytes, out.nbytes)
return

pure_dep_keys = set(
inp.key
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
if is_dep
)
exec_sizes = [0]
for inp in op.inputs or ():
if inp.key in pure_dep_keys:
continue
Expand All @@ -361,13 +363,16 @@ def estimate_size(cls, ctx: dict, op: OperandType):
# execution size of a specific data chunk may be
# larger than stored type due to objects
for key, shape in keys_and_shapes:
exec_size += ctx[key][0]
exec_sizes.append(ctx[key][0])
except KeyError:
if not op.sparse:
inp_size = calc_data_size(inp)
if not np.isnan(inp_size):
exec_size += inp_size
exec_size = int(exec_size)
exec_sizes.append(inp_size)
if any(c.is_sparse() for c in op.inputs):
exec_size = sum(exec_sizes)
else:
exec_size = max(exec_sizes)

total_out_size = 0
chunk_sizes = dict()
Expand Down Expand Up @@ -408,7 +413,7 @@ def estimate_size(cls, ctx: dict, op: OperandType):
max_sparse_size = np.nan
if not np.isnan(max_sparse_size):
result_size = min(result_size, max_sparse_size)
ctx[out.key] = (result_size, exec_size * memory_scale // len(outputs))
ctx[out.key] = (result_size, int(exec_size * memory_scale // len(outputs)))

@classmethod
def concat_tileable_chunks(cls, tileable: TileableType):
Expand Down
4 changes: 4 additions & 0 deletions mars/lib/groupby_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy

from ..utils import estimate_pandas_size
from .version import parse as parse_version

_HAS_SQUEEZE = parse_version(pd.__version__) < parse_version("1.1.0")
Expand Down Expand Up @@ -124,6 +125,9 @@ def __sizeof__(self):
getattr(self.groupby_obj.grouper, "_cache", None)
)

def estimate_size(self):
return estimate_pandas_size(self.obj) + estimate_pandas_size(self.obj.index)

def __reduce__(self):
return (
type(self).from_tuple,
Expand Down
3 changes: 2 additions & 1 deletion mars/lib/tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import numpy as np

from ...tests.core import assert_groupby_equal
from ...utils import calc_data_size
from ...utils import calc_data_size, estimate_pandas_size
from ..groupby_wrapper import wrapped_groupby


Expand All @@ -42,6 +42,7 @@ def test_groupby_wrapper():
assert grouped.is_frame is True
assert sys.getsizeof(grouped) > sys.getsizeof(grouped.groupby_obj)
assert calc_data_size(grouped) > sys.getsizeof(grouped.groupby_obj)
assert grouped.estimate_size() > estimate_pandas_size(grouped.groupby_obj)

grouped = conv_func(wrapped_groupby(df, level=0).C)
assert_groupby_equal(grouped, df.groupby(level=0).C)
Expand Down
26 changes: 18 additions & 8 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,22 @@ async def _collect_input_sizes(
*(storage_api.get_infos.delay(k) for k in fetch_keys)
)

# compute memory quota size. when data located in shared memory, the cost
# should be differences between deserialized memory cost and serialized cost,
# otherwise we should take deserialized memory cost
for key, meta, infos in zip(fetch_keys, fetch_metas, data_infos):
level = functools.reduce(operator.or_, (info.level for info in infos))
if level & StorageLevel.MEMORY:
mem_cost = max(0, meta["memory_size"] - meta["store_size"])
else:
mem_cost = meta["memory_size"]
sizes[key] = (mem_cost, mem_cost)
sizes[key] = (meta["store_size"], mem_cost)

return sizes

@classmethod
def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
size_context = {k: (s, 0) for k, (s, _c) in input_sizes.items()}
size_context = dict(input_sizes.items())
graph = subtask.chunk_graph

key_to_ops = defaultdict(set)
Expand All @@ -243,7 +246,7 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):

visited_op_keys = set()
total_memory_cost = 0
max_memory_cost = 0
max_memory_cost = sum(calc_size for _, calc_size in size_context.values())
while key_stack:
key = key_stack.pop()
op = key_to_ops[key][0]
Expand All @@ -255,24 +258,31 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
total_memory_cost += calc_cost
max_memory_cost = max(total_memory_cost, max_memory_cost)

result_cost = sum(size_context[out.key][0] for out in op.outputs)
total_memory_cost += result_cost - calc_cost
if not isinstance(op, Fetch):
# when calculation result is stored, memory cost of calculation
# can be replaced with result memory cost
result_cost = sum(size_context[out.key][0] for out in op.outputs)
total_memory_cost += result_cost - calc_cost

visited_op_keys.add(op.key)
visited_op_keys.add(key)

for succ_op_key in op_key_graph.iter_successors(key):
pred_ref_count[succ_op_key] -= 1
if pred_ref_count[succ_op_key] == 0:
key_stack.append(succ_op_key)

for pred_op_key in op_key_graph.iter_predecessors(key):
succ_ref_count[pred_op_key] -= 1
if succ_ref_count[pred_op_key] == 0:
pred_op = key_to_ops[pred_op_key][0]
# when clearing fetches, subtract memory size, otherwise subtract store size
account_idx = 1 if isinstance(pred_op, Fetch) else 0
pop_result_cost = sum(
size_context.pop(out.key, (0, 0))[0]
size_context.pop(out.key, (0, 0))[account_idx]
for out in key_to_ops[pred_op_key][0].outputs
)
total_memory_cost -= pop_result_cost
return sum(t[1] for t in size_context.values()), max_memory_cost
return sum(t[0] for t in size_context.values()), max_memory_cost

@classmethod
def _check_cancelling(cls, subtask_info: SubtaskExecutionInfo):
Expand Down
50 changes: 49 additions & 1 deletion mars/services/scheduling/worker/tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
from typing import Tuple

import numpy as np
import pandas as pd
import pytest

from ..... import oscar as mo
from ..... import remote as mr
from .....core import ChunkGraph, ChunkGraphBuilder, TileableGraph, TileableGraphBuilder
from .....core import (
ChunkGraph,
ChunkGraphBuilder,
TileableGraph,
TileableGraphBuilder,
OutputType,
)
from .....remote.core import RemoteFunction
from .....tensor.fetch import TensorFetch
from .....tensor.arithmetic import TensorTreeAdd
Expand Down Expand Up @@ -384,6 +391,47 @@ def delay_fun(delay, _inp1):
)


def test_estimate_size():
from ..execution import SubtaskExecutionActor
from .....dataframe.arithmetic import DataFrameAdd
from .....dataframe.fetch import DataFrameFetch
from .....dataframe.utils import parse_index

index_value = parse_index(pd.Int64Index([10, 20, 30]))

input1 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
[], _key="INPUT1", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
)
input2 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
[], _key="INPUT2", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
)
result_chunk = DataFrameAdd(
axis=0, output_types=[OutputType.series], lhs=input1, rhs=input2
).new_chunk(
[input1, input2],
_key="ADD_RESULT",
shape=(np.nan,),
dtype=np.dtype("O"),
index_value=index_value,
)

chunk_graph = ChunkGraph([result_chunk])
chunk_graph.add_node(input1)
chunk_graph.add_node(input2)
chunk_graph.add_node(result_chunk)
chunk_graph.add_edge(input1, result_chunk)
chunk_graph.add_edge(input2, result_chunk)

input_sizes = {
"INPUT1": (1024, 1024),
"INPUT2": (1024, 1024),
}

subtask = Subtask("test_subtask", session_id="session_id", chunk_graph=chunk_graph)
result = SubtaskExecutionActor._estimate_sizes(subtask, input_sizes)
assert result[0] == 1024


@pytest.mark.asyncio
@pytest.mark.parametrize("actor_pool", [(1, False)], indirect=True)
async def test_cancel_without_kill(actor_pool):
Expand Down
38 changes: 36 additions & 2 deletions mars/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def test_lazy_import():
old_sys_path = sys.path
mock_mod = textwrap.dedent(
"""
__version__ = '0.1.0b1'
""".strip()
__version__ = '0.1.0b1'
""".strip()
)

temp_dir = tempfile.mkdtemp(prefix="mars-utils-test-")
Expand Down Expand Up @@ -482,6 +482,40 @@ def test_readable_size():
assert utils.readable_size(14354000000000) == "13.05T"


def test_estimate_pandas_size():
df1 = pd.DataFrame(np.random.rand(50, 10))
assert utils.estimate_pandas_size(df1) == sys.getsizeof(df1)

df2 = pd.DataFrame(np.random.rand(1000, 10))
assert utils.estimate_pandas_size(df2) == sys.getsizeof(df2)

df3 = pd.DataFrame(
{
"A": np.random.choice(["abcd", "def", "gh"], size=(1000,)),
"B": np.random.rand(1000),
"C": np.random.rand(1000),
}
)
assert utils.estimate_pandas_size(df3) != sys.getsizeof(df3)

s1 = pd.Series(np.random.rand(1000))
assert utils.estimate_pandas_size(s1) == sys.getsizeof(s1)

from ..dataframe.arrays import ArrowStringArray

array = ArrowStringArray(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
s2 = pd.Series(array)
assert utils.estimate_pandas_size(s2) == sys.getsizeof(s2)

s3 = pd.Series(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
assert utils.estimate_pandas_size(s3) != sys.getsizeof(s3)

idx1 = pd.MultiIndex.from_arrays(
[np.arange(0, 1000), np.random.choice(["abcd", "def", "gh"], size=(1000,))]
)
assert utils.estimate_pandas_size(idx1) != sys.getsizeof(idx1)


@require_ray
def test_web_serialize_lambda():
register_ray_serializers()
Expand Down
45 changes: 43 additions & 2 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,10 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sum(calc_data_size(c) for c in dt)

shape = getattr(dt, "shape", None) or shape
if hasattr(dt, "memory_usage") or hasattr(dt, "groupby_obj"):
return sys.getsizeof(dt)
if isinstance(dt, (pd.DataFrame, pd.Series)):
return estimate_pandas_size(dt)
if hasattr(dt, "estimate_size"):
return dt.estimate_size()
if hasattr(dt, "nbytes"):
return max(sys.getsizeof(dt), dt.nbytes)
if hasattr(dt, "shape") and len(dt.shape) == 0:
Expand All @@ -404,6 +406,45 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sys.getsizeof(dt)


def estimate_pandas_size(
df_obj, max_samples: int = 10, min_sample_rows: int = 100
) -> int:
if len(df_obj) <= min_sample_rows or isinstance(df_obj, pd.RangeIndex):
return sys.getsizeof(df_obj)

from .dataframe.arrays import ArrowDtype

def _is_fast_dtype(dtype):
if isinstance(dtype, np.dtype):
return np.issubdtype(dtype, np.number)
else:
return isinstance(dtype, ArrowDtype)

dtypes = []
if isinstance(df_obj, pd.DataFrame):
dtypes.extend(df_obj.dtypes)
index_obj = df_obj.index
elif isinstance(df_obj, pd.Series):
dtypes.append(df_obj.dtype)
index_obj = df_obj.index
else:
index_obj = df_obj

# handling possible MultiIndex
if hasattr(index_obj, "dtypes"):
dtypes.extend(index_obj.dtypes)
else:
dtypes.append(index_obj.dtype)

if all(_is_fast_dtype(dtype) for dtype in dtypes):
return sys.getsizeof(df_obj)

indices = np.sort(np.random.choice(len(df_obj), size=max_samples, replace=False))
iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc
sample_size = sys.getsizeof(iloc[indices])
return sample_size * len(df_obj) // max_samples


def build_fetch_chunk(
chunk: ChunkType, input_chunk_keys: List[str] = None, **kwargs
) -> ChunkType:
Expand Down

0 comments on commit 739b0b2

Please sign in to comment.