Skip to content

Commit

Permalink
FEAT-#5394: Reduce amount of remote calls for TreeReduce and GroupByR…
Browse files Browse the repository at this point in the history
…educe operators (#7245)

Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
  • Loading branch information
Retribution98 committed May 14, 2024
1 parent 55f331f commit f81bbe6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 43 deletions.
48 changes: 7 additions & 41 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2212,46 +2212,12 @@ def map(
PandasDataframe
A new dataframe.
"""
if self.num_parts <= 1.5 * CpuCount.get():
# block-wise map
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
# axis-wise map
# we choose an axis for a combination of partitions
# whose size is closer to the number of CPUs
if abs(self._partitions.shape[0] - CpuCount.get()) < abs(
self._partitions.shape[1] - CpuCount.get()
):
axis = 1
else:
axis = 0

column_splits = CpuCount.get() // self._partitions.shape[1]

if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = (
self._partition_mgr_cls.map_partitions_joined_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
)
else:
# splitting by full axis partitions
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
lambda df: func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)

if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
Expand Down
68 changes: 67 additions & 1 deletion modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from modin.config import (
BenchmarkMode,
CpuCount,
Engine,
MinPartitionSize,
NPartitions,
Expand Down Expand Up @@ -603,7 +604,7 @@ def broadcast_axis_partitions(

@classmethod
@wait_computations_if_benchmark_mode
def map_partitions(
def base_map_partitions(
cls,
partitions,
map_func,
Expand Down Expand Up @@ -644,6 +645,71 @@ def map_partitions(
]
)

@classmethod
@wait_computations_if_benchmark_mode
def map_partitions(
cls,
partitions,
map_func,
func_args=None,
func_kwargs=None,
):
"""
Apply `map_func` to `partitions` using different approaches to achieve the best performance.
Parameters
----------
partitions : NumPy 2D array
Partitions housing the data of Modin Frame.
map_func : callable
Function to apply.
func_args : iterable, optional
Positional arguments for the 'map_func'.
func_kwargs : dict, optional
Keyword arguments for the 'map_func'.
Returns
-------
NumPy array
An array of partitions
"""
if np.prod(partitions.shape) <= 1.5 * CpuCount.get():
# block-wise map
new_partitions = cls.base_map_partitions(
partitions, map_func, func_args, func_kwargs
)
else:
# axis-wise map
# we choose an axis for a combination of partitions
# whose size is closer to the number of CPUs
if abs(partitions.shape[0] - CpuCount.get()) < abs(
partitions.shape[1] - CpuCount.get()
):
axis = 1
else:
axis = 0

column_splits = CpuCount.get() // partitions.shape[1]

if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = cls.map_partitions_joined_by_column(
partitions, column_splits, map_func, func_args, func_kwargs
)
else:
# splitting by full axis partitions
new_partitions = cls.map_axis_partitions(
axis,
partitions,
lambda df: map_func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)
return new_partitions

@classmethod
@wait_computations_if_benchmark_mode
def lazy_map_partitions(
Expand Down
3 changes: 2 additions & 1 deletion modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2657,14 +2657,15 @@ def test_map_approaches(partitioning_scheme, expected_map_approach):
df = pandas.DataFrame(data)

modin_df = construct_modin_df_by_scheme(df, partitioning_scheme(df))
partitions = modin_df._query_compiler._modin_frame._partitions
partition_mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls

with mock.patch.object(
partition_mgr_cls,
expected_map_approach,
wraps=getattr(partition_mgr_cls, expected_map_approach),
) as expected_method:
try_cast_to_pandas(modin_df.map(lambda x: x * 2))
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
expected_method.assert_called()


Expand Down

0 comments on commit f81bbe6

Please sign in to comment.