Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5394: Reduce amount of remote calls for Map operator #7136

Merged
merged 20 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2202,12 +2202,44 @@
PandasDataframe
A new dataframe.
"""
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
if self.num_parts <= 1.5 * CpuCount.get():
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
# block-wise map
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
# axis-wise map
# we choose an axis for a combination of partitions whose size is closer to the number of CPUs
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
axis = 0
if abs(self._partitions.shape[0] - CpuCount.get()) < abs(
self._partitions.shape[1] - CpuCount.get()
):
axis = 1
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
column_splits = CpuCount.get() // self._partitions.shape[1]

Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = (

Check warning on line 2225 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L2225

Added line #L2225 was not covered by tests
self._partition_mgr_cls.map_partitions_joined_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
)
else:
# splitting by full axis partitions
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
lambda df: func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)

if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
self.columns
Expand Down
71 changes: 71 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,77 @@
**kwargs,
)

@classmethod
def map_partitions_joined_by_column(
cls,
partitions,
column_splits,
map_func,
map_func_args=None,
map_func_kwargs=None,
):
"""
Combine several blocks by column into one virtual partition and apply "map_func" to them.

Parameters
----------
partitions : NumPy 2D array
Partitions of Modin Frame.
column_splits : int
The number of splits by column.
map_func : callable
Function to apply.
map_func_args : iterable, optional
Positional arguments for the 'map_func'.
map_func_kwargs : dict, optional
Keyword arguments for the 'map_func'.

Returns
-------
NumPy array
An array of new partitions for Modin Frame.
"""
if column_splits < 1:
raise ValueError(

Check warning on line 779 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L778-L779

Added lines #L778 - L779 were not covered by tests
"The value of columns_splits must be greater than or equal to 1."
)
# step cannot be less than 1
step = max(partitions.shape[0] // column_splits, 1)
new_partitions = np.array(

Check warning on line 784 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L783-L784

Added lines #L783 - L784 were not covered by tests
[
cls.column_partitions(
partitions[i : i + step],
# full_axis=False,
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
)
for i in range(
0,
partitions.shape[0],
step,
)
]
)
preprocessed_map_func = cls.preprocess_func(map_func)
kw = {

Check warning on line 798 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L797-L798

Added lines #L797 - L798 were not covered by tests
"num_splits": step,
}
return np.concatenate(

Check warning on line 801 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L801

Added line #L801 was not covered by tests
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
[
np.stack(
[
part.apply(
preprocessed_map_func,
*map_func_args if map_func_args is not None else (),
**kw,
**map_func_kwargs if map_func_kwargs is not None else {},
)
for part in row_of_parts
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
],
axis=-1,
)
for row_of_parts in new_partitions
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
]
)

@classmethod
def concat(cls, axis, left_parts, right_parts):
"""
Expand Down
Loading