Skip to content

Commit

Permalink
Optimize serializable memory (#3120)
Browse files Browse the repository at this point in the history
* Optimize serializable memory

* Fix lint

* Fix

* Optimize

* Rename attr_name to name of Field

* Fix

* Fix

* Fix benchmark

* Add asv bench

* Optimize copy

* Improve coverage

* Fix

Co-authored-by: 刘宝 <po.lb@antfin.com>
  • Loading branch information
fyrestone and 刘宝 committed Jun 8, 2022
1 parent 18a3af8 commit cd22c4c
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 165 deletions.
27 changes: 27 additions & 0 deletions benchmarks/asv_bench/benchmarks/graph_assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import random
import tracemalloc

import mars.tensor as mt
import mars.dataframe as md
Expand All @@ -30,6 +31,7 @@ class ChunkGraphAssignerSuite:
repeat = 10

def setup(self):
tracemalloc.start()
random.seed()

num_rows = 10000
Expand All @@ -45,6 +47,8 @@ def setup(self):
graph = TileableGraph([merged_df.data])
next(TileableGraphBuilder(graph).build())
self.chunk_graph = next(ChunkGraphBuilder(graph, fuse_enabled=False).build())
self.mem_size, self.mem_peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

def time_assigner(self):
start_ops = list(GraphAnalyzer._iter_start_ops(self.chunk_graph))
Expand All @@ -55,3 +59,26 @@ def time_assigner(self):
assigner = GraphAssigner(self.chunk_graph, start_ops, band_resource)
assigned_result = assigner.assign(current_assign)
assert len(assigned_result) == len(start_ops)

def peakmem_setup(self):
"""peakmem includes the memory used by setup.
Peakmem benchmarks measure the maximum amount of RAM used by a
function. However, this maximum also includes the memory used
by ``setup`` (as of asv 0.2.1; see [1]_)
Measuring an empty peakmem function might allow us to disambiguate
between the memory used by setup and the memory used by slic (see
``peakmem_slic_basic``, below).
References
----------
.. [1]: https://asv.readthedocs.io/en/stable/writing_benchmarks.html#peak-memory
"""
pass

def mem_chunk_graph(self):
return self.chunk_graph

def track_traced_mem_size(self):
return self.mem_size

def track_traced_mem_peak(self):
return self.mem_peak
14 changes: 9 additions & 5 deletions benchmarks/asv_bench/benchmarks/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
Timedelta64Field,
TupleField,
DictField,
Complex64Field,
Complex128Field,
)
from mars.services.subtask import Subtask, SubtaskResult, SubtaskStatus
from mars.services.task import new_task_id
Expand Down Expand Up @@ -72,11 +74,13 @@ class MySerializable(Serializable):
_int64_val = Int64Field("f3")
_float32_val = Float32Field("f4")
_float64_val = Float64Field("f5")
_string_val = StringField("f6")
_datetime64_val = Datetime64Field("f7")
_timedelta64_val = Timedelta64Field("f8")
_datatype_val = DataTypeField("f9")
_slice_val = SliceField("f10")
_complex64_val = Complex64Field("f6")
_complex128_val = Complex128Field("f7")
_string_val = StringField("f8")
_datetime64_val = Datetime64Field("f9")
_timedelta64_val = Timedelta64Field("f10")
_datatype_val = DataTypeField("f11")
_slice_val = SliceField("f12")
_list_val = ListField("list_val", FieldTypes.int64)
_tuple_val = TupleField("tuple_val", FieldTypes.string)
_dict_val = DictField("dict_val", FieldTypes.string, FieldTypes.bytes)
Expand Down
50 changes: 28 additions & 22 deletions mars/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,21 @@ def _copy_tags_(self):
return getattr(cls, member)
except AttributeError:
slots = sorted(
f.attr_name
for k, f in self._FIELDS.items()
if k not in self._no_copy_attrs_
f.name for k, f in self._FIELDS.items() if k not in self._no_copy_attrs_
)
setattr(cls, member, slots)
return slots

@property
def _values_(self):
return [self._FIELD_VALUES.get(k) for k in self._copy_tags_]
values = []
fields = self._FIELDS
for k in self._copy_tags_:
try:
values.append(fields[k].get(self))
except AttributeError:
values.append(None)
return values

def __mars_tokenize__(self):
try:
Expand All @@ -91,19 +96,18 @@ def copy(self):
return self.copy_to(type(self)(_key=self.key))

def copy_to(self, target: "Base"):
new_values = dict()
values = self._FIELD_VALUES
for k in self._FIELDS:
if k in self._no_copy_attrs_:
target_fields = target._FIELDS
no_copy_attrs = self._no_copy_attrs_
for k, field in self._FIELDS.items():
if k in no_copy_attrs:
continue
try:
# Slightly faster than getattr.
value = field.__get__(self, k)
target_fields[k].set(target, value)
except AttributeError:
continue
if k in values:
new_values[k] = values[k]
else:
try:
new_values[k] = getattr(self, k)
except AttributeError:
continue
target._FIELD_VALUES.update(new_values)

return target

def copy_from(self, obj):
Expand All @@ -119,12 +123,14 @@ def id(self):

def to_kv(self, exclude_fields: Tuple[str], accept_value_types: Tuple[Type]):
fields = self._FIELDS
field_values = self._FIELD_VALUES
return {
fields[attr_name].tag: value
for attr_name, value in field_values.items()
if attr_name not in exclude_fields and isinstance(value, accept_value_types)
}
kv = {}
no_value = object()
for name, field in fields.items():
if name not in exclude_fields:
value = getattr(self, name, no_value)
if value is not no_value and isinstance(value, accept_value_types):
kv[field.tag] = value
return kv


def buffered_base(func):
Expand Down
9 changes: 6 additions & 3 deletions mars/dataframe/arithmetic/tests/test_arithmetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1523,12 +1523,15 @@ def test_arithmetic_lazy_chunk_meta():
df2 = tile(df2)

chunk = df2.chunks[0].data
assert chunk._FIELD_VALUES.get("_dtypes") is None
assert chunk._FIELDS["_dtypes"].get(chunk) is None
pd.testing.assert_series_equal(chunk.dtypes, df.dtypes)
assert chunk._FIELD_VALUES.get("_index_value") is None
assert chunk._FIELDS["_dtypes"].get(chunk) is not None
assert chunk._FIELDS["_index_value"].get(chunk) is None
pd.testing.assert_index_equal(chunk.index_value.to_pandas(), pd.RangeIndex(3))
assert chunk._FIELD_VALUES.get("_columns_value") is None
assert chunk._FIELDS["_index_value"].get(chunk) is not None
assert chunk._FIELDS["_columns_value"].get(chunk) is None
pd.testing.assert_index_equal(chunk.columns_value.to_pandas(), pd.RangeIndex(3))
assert chunk._FIELDS["_columns_value"].get(chunk) is not None


def test_datetime_arithmetic():
Expand Down
45 changes: 24 additions & 21 deletions mars/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,15 +505,16 @@ def _gen_chunk_dtypes(instance: Chunk, index: int) -> Optional[pd.Series]:
cache[tileable_key, index] = dtypes
return dtypes

def __get__(self, instance, owner):
def __get__(self, instance, owner=None):
if not issubclass(owner, LazyMetaChunkData): # pragma: no cover
return super().__get__(instance, owner)

values = instance._FIELD_VALUES
dtypes = values.get("_dtypes", None)
if dtypes is not None:
# been set before
return dtypes
try:
value = self.get(instance, owner)
if value is not None:
return value
except AttributeError: # pragma: no cover
pass

if instance.index is None:
return super().__get__(instance, owner)
Expand All @@ -522,7 +523,7 @@ def __get__(self, instance, owner):
index = instance.index[1]
dtypes = self._gen_chunk_dtypes(instance, index)
# cache dtypes
values["_dtypes"] = dtypes
self.set(instance, dtypes)
return dtypes


Expand Down Expand Up @@ -557,15 +558,16 @@ def _gen_chunk_index_value(instance: Chunk, index: int) -> Optional[IndexValue]:
cache[tileable_key, index] = index_value
return index_value

def __get__(self, instance, owner):
def __get__(self, instance, owner=None):
if not issubclass(owner, LazyMetaChunkData): # pragma: no cover
return super().__get__(instance, owner)

values = instance._FIELD_VALUES
index_value = values.get("_index_value", None)
if index_value is not None:
# been set before
return index_value
try:
value = self.get(instance, owner)
if value is not None:
return value
except AttributeError: # pragma: no cover
pass

if instance.index is None:
return super().__get__(instance, owner)
Expand All @@ -574,7 +576,7 @@ def __get__(self, instance, owner):
index = instance.index[0]
index_value = self._gen_chunk_index_value(instance, index)
# cache index_value
values["_index_value"] = index_value
self.set(instance, index_value)
return index_value


Expand Down Expand Up @@ -605,15 +607,16 @@ def _gen_chunk_columns_value(instance: Chunk, index: int) -> Optional[IndexValue
cache[tileable_key, index] = columns_value
return columns_value

def __get__(self, instance, owner):
def __get__(self, instance, owner=None):
if not issubclass(owner, LazyMetaChunkData): # pragma: no cover
return super().__get__(instance, owner)

values = instance._FIELD_VALUES
columns_value = values.get("_columns_value", None)
if columns_value is not None:
# been set before
return columns_value
try:
value = self.get(instance, owner)
if value is not None:
return value
except AttributeError: # pragma: no cover
pass

if instance.index is None:
return super().__get__(instance, owner)
Expand All @@ -622,7 +625,7 @@ def __get__(self, instance, owner):
index = instance.index[1]
columns_value = self._gen_chunk_columns_value(instance, index)
# cache columns_value
values["_columns_value"] = columns_value
self.set(instance, columns_value)
return columns_value


Expand Down
12 changes: 8 additions & 4 deletions mars/dataframe/indexing/tests/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,16 +940,20 @@ def test_getitem_lazy_chunk_meta():
df2 = tile(df2)

chunk = df2.chunks[0].data
assert chunk._FIELD_VALUES.get("_dtypes") is None
assert chunk._FIELDS["_dtypes"].get(chunk) is None
pd.testing.assert_series_equal(chunk.dtypes, df.dtypes[[0, 2]])
assert chunk._FIELD_VALUES.get("_index_value") is None
assert chunk._FIELDS["_dtypes"].get(chunk) is not None
assert chunk._FIELDS["_index_value"].get(chunk) is None
pd.testing.assert_index_equal(chunk.index_value.to_pandas(), pd.RangeIndex(3))
assert chunk._FIELD_VALUES.get("_columns_value") is None
assert chunk._FIELDS["_index_value"].get(chunk) is not None
assert chunk._FIELDS["_columns_value"].get(chunk) is None
pd.testing.assert_index_equal(chunk.columns_value.to_pandas(), pd.Index([0, 2]))
assert chunk._FIELDS["_columns_value"].get(chunk) is not None

df2 = df[2]
df2 = tile(df2)

chunk = df2.chunks[0].data
assert chunk._FIELD_VALUES.get("_index_value") is None
assert chunk._FIELDS["_index_value"].get(chunk) is None
pd.testing.assert_index_equal(chunk.index_value.to_pandas(), pd.RangeIndex(3))
assert chunk._FIELDS["_index_value"].get(chunk) is not None

0 comments on commit cd22c4c

Please sign in to comment.