Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5394: Reduce amount of remote calls for Map operator #7136

Merged
merged 20 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2202,12 +2202,44 @@ def map(
PandasDataframe
A new dataframe.
"""
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
if self.num_parts <= 1.5 * CpuCount.get():
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
# block-wise map
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
# axis-wise map
# we choose an axis for a combination of partitions whose size is closer to the number of CPUs
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
axis = 0
if abs(self._partitions.shape[0] - CpuCount.get()) < abs(
self._partitions.shape[1] - CpuCount.get()
):
axis = 1
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved

column_splits = (
self._partitions.shape[0] * self._partitions.shape[1]
) // CpuCount.get()

if axis == 1 or column_splits <= 1:
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
# splitting by full axis partitions
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
func,
keep_partitioning=True,
map_func_args=func_args,
**(func_kwargs if func_kwargs is None else {}),
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
)
else:
# splitting by parts of columnar partitions
new_partitions = (
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
self._partition_mgr_cls.map_partitions_joined_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
)
if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
self.columns
Expand Down
69 changes: 69 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ def map_axis_partitions(
num_splits=None,
lengths=None,
enumerate_partitions=False,
map_func_args=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -720,6 +721,8 @@ def map_axis_partitions(
enumerate_partitions : bool, default: False
Whether or not to pass partition index into `map_func`.
Note that `map_func` must be able to accept `partition_idx` kwarg.
map_func_args : list-like, optional
Positional arguments to pass to the `map_func`.
**kwargs : dict
Additional options that could be used by different engines.

Expand All @@ -742,9 +745,75 @@ def map_axis_partitions(
right=None,
lengths=lengths,
enumerate_partitions=enumerate_partitions,
apply_func_args=map_func_args,
**kwargs,
)

@classmethod
def map_partitions_joined_by_column(
cls,
partitions,
column_splits,
map_func,
map_func_args=None,
map_func_kwargs=None,
):
"""
Combine several blocks by column into one virtual partition and apply "map_func" to them.

Parameters
----------
partitions : NumPy 2D array
Partitions of Modin Frame.
column_splits : int
The number of splits by column.
map_func : callable
Function to apply.
map_func_args : iterable, optional
Positional arguments for the 'map_func'.
map_func_kwargs : dict, optional
Keyword arguments for the 'map_func'.

Returns
-------
NumPy array
An array of new partitions for Modin Frame.
"""
new_partitions = np.array(
[
cls.column_partitions(
partitions[i : i + column_splits],
# full_axis=False,
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
)
for i in range(
0,
partitions.shape[0],
column_splits,
)
]
)
preprocessed_map_func = cls.preprocess_func(map_func)
kw = {
"num_splits": column_splits,
}
return np.concatenate(
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
[
np.stack(
[
part.apply(
preprocessed_map_func,
*map_func_args if map_func_args is not None else (),
**kw,
**map_func_kwargs if map_func_kwargs is not None else {},
)
for part in row_of_parts
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
],
axis=-1,
)
for row_of_parts in new_partitions
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
)

@classmethod
def concat(cls, axis, left_parts, right_parts):
"""
Expand Down
Loading