Skip to content

Commit

Permalink
Reduce estimation time cost
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Nov 8, 2021
1 parent 56acab7 commit df34c1d
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
4 changes: 4 additions & 0 deletions mars/lib/groupby_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy

from ..utils import estimate_pandas_size
from .version import parse as parse_version

_HAS_SQUEEZE = parse_version(pd.__version__) < parse_version("1.1.0")
Expand Down Expand Up @@ -124,6 +125,9 @@ def __sizeof__(self):
getattr(self.groupby_obj.grouper, "_cache", None)
)

def estimate_size(self):
return estimate_pandas_size(self.obj) + estimate_pandas_size(self.obj.index)

def __reduce__(self):
return (
type(self).from_tuple,
Expand Down
3 changes: 2 additions & 1 deletion mars/lib/tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import numpy as np

from ...tests.core import assert_groupby_equal
from ...utils import calc_data_size
from ...utils import calc_data_size, estimate_pandas_size
from ..groupby_wrapper import wrapped_groupby


Expand All @@ -42,6 +42,7 @@ def test_groupby_wrapper():
assert grouped.is_frame is True
assert sys.getsizeof(grouped) > sys.getsizeof(grouped.groupby_obj)
assert calc_data_size(grouped) > sys.getsizeof(grouped.groupby_obj)
assert grouped.estimate_size() > estimate_pandas_size(grouped.groupby_obj)

grouped = conv_func(wrapped_groupby(df, level=0).C)
assert_groupby_equal(grouped, df.groupby(level=0).C)
Expand Down
4 changes: 2 additions & 2 deletions mars/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def test_lazy_import():
old_sys_path = sys.path
mock_mod = textwrap.dedent(
"""
__version__ = '0.1.0b1'
""".strip()
__version__ = '0.1.0b1'
""".strip()
)

temp_dir = tempfile.mkdtemp(prefix="mars-utils-test-")
Expand Down
24 changes: 22 additions & 2 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,10 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sum(calc_data_size(c) for c in dt)

shape = getattr(dt, "shape", None) or shape
if hasattr(dt, "memory_usage") or hasattr(dt, "groupby_obj"):
return sys.getsizeof(dt)
if isinstance(dt, (pd.DataFrame, pd.Series)):
return estimate_pandas_size(dt)
if hasattr(dt, "estimate_size"):
return dt.estimate_size()
if hasattr(dt, "nbytes"):
return max(sys.getsizeof(dt), dt.nbytes)
if hasattr(dt, "shape") and len(dt.shape) == 0:
Expand All @@ -404,6 +406,24 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sys.getsizeof(dt)


def estimate_pandas_size(
df_obj, max_samples: int = 10, min_sample_rows: int = 100
) -> int:
if len(df_obj) <= min_sample_rows or isinstance(df_obj, pd.RangeIndex):
return sys.getsizeof(df_obj)
elif hasattr(df_obj, "dtype"):
if isinstance(df_obj.dtype, np.number):
return sys.getsizeof(df_obj)
else:
if all(isinstance(dtype, np.number) for dtype in df_obj.dtypes):
return sys.getsizeof(df_obj)

indices = np.sort(np.random.choice(len(df_obj), size=max_samples, replace=False))
iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc
sample_size = sys.getsizeof(iloc[indices])
return sample_size * len(df_obj) // max_samples


def build_fetch_chunk(
chunk: ChunkType, input_chunk_keys: List[str] = None, **kwargs
) -> ChunkType:
Expand Down

0 comments on commit df34c1d

Please sign in to comment.