Skip to content

Commit

Permalink
Reduce estimation time cost
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Nov 8, 2021
1 parent 56acab7 commit ccc4af7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
6 changes: 6 additions & 0 deletions mars/lib/groupby_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy

from ..utils import estimate_dataframe_size
from .version import parse as parse_version

_HAS_SQUEEZE = parse_version(pd.__version__) < parse_version("1.1.0")
Expand Down Expand Up @@ -124,6 +125,11 @@ def __sizeof__(self):
getattr(self.groupby_obj.grouper, "_cache", None)
)

def estimate_size(self):
return estimate_dataframe_size(self.obj) + estimate_dataframe_size(
self.obj.index
)

def __reduce__(self):
return (
type(self).from_tuple,
Expand Down
3 changes: 2 additions & 1 deletion mars/lib/tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import numpy as np

from ...tests.core import assert_groupby_equal
from ...utils import calc_data_size
from ...utils import calc_data_size, estimate_dataframe_size
from ..groupby_wrapper import wrapped_groupby


Expand All @@ -42,6 +42,7 @@ def test_groupby_wrapper():
assert grouped.is_frame is True
assert sys.getsizeof(grouped) > sys.getsizeof(grouped.groupby_obj)
assert calc_data_size(grouped) > sys.getsizeof(grouped.groupby_obj)
assert grouped.estimate_size() > estimate_dataframe_size(grouped.groupby_obj)

grouped = conv_func(wrapped_groupby(df, level=0).C)
assert_groupby_equal(grouped, df.groupby(level=0).C)
Expand Down
20 changes: 18 additions & 2 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,10 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sum(calc_data_size(c) for c in dt)

shape = getattr(dt, "shape", None) or shape
if hasattr(dt, "memory_usage") or hasattr(dt, "groupby_obj"):
return sys.getsizeof(dt)
if isinstance(dt, (pd.DataFrame, pd.Series)):
return estimate_dataframe_size(dt)
if hasattr(dt, "estimate_size"):
return dt.estimate_size()
if hasattr(dt, "nbytes"):
return max(sys.getsizeof(dt), dt.nbytes)
if hasattr(dt, "shape") and len(dt.shape) == 0:
Expand All @@ -404,6 +406,20 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sys.getsizeof(dt)


def estimate_dataframe_size(
df_obj, max_samples: int = 10, min_sample_rows: int = 100
) -> int:
if len(df_obj) <= min_sample_rows or isinstance(df_obj, pd.RangeIndex):
return sys.getsizeof(df_obj)
else:
indices = np.sort(
np.random.choice(len(df_obj), size=max_samples, replace=False)
)
iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc
sample_size = sys.getsizeof(iloc[indices])
return sample_size * len(df_obj) // max_samples


def build_fetch_chunk(
chunk: ChunkType, input_chunk_keys: List[str] = None, **kwargs
) -> ChunkType:
Expand Down

0 comments on commit ccc4af7

Please sign in to comment.