Skip to content

Commit

Permalink
FEAT-#5394: Reduce amount of remote calls for Map operator (#7136)
Browse files Browse the repository at this point in the history
Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
  • Loading branch information
Retribution98 authored May 3, 2024
1 parent 3bdbbcb commit f8bf5b4
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 8 deletions.
49 changes: 42 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2205,12 +2205,47 @@ def map(
PandasDataframe
A new dataframe.
"""
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
if self.num_parts <= 1.5 * CpuCount.get():
# block-wise map
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
# axis-wise map
# we choose an axis for a combination of partitions
# whose size is closer to the number of CPUs
if abs(self._partitions.shape[0] - CpuCount.get()) < abs(
self._partitions.shape[1] - CpuCount.get()
):
axis = 1
else:
axis = 0

column_splits = CpuCount.get() // self._partitions.shape[1]

if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = (
self._partition_mgr_cls.map_partitions_joined_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
)
else:
# splitting by full axis partitions
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
lambda df: func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)

if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
self.columns
Expand Down
57 changes: 57 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,63 @@ def map_axis_partitions(
**kwargs,
)

@classmethod
def map_partitions_joined_by_column(
cls,
partitions,
column_splits,
map_func,
map_func_args=None,
map_func_kwargs=None,
):
"""
Combine several blocks by column into one virtual partition and apply "map_func" to them.
Parameters
----------
partitions : NumPy 2D array
Partitions of Modin Frame.
column_splits : int
The number of splits by column.
map_func : callable
Function to apply.
map_func_args : iterable, optional
Positional arguments for the 'map_func'.
map_func_kwargs : dict, optional
Keyword arguments for the 'map_func'.
Returns
-------
NumPy array
An array of new partitions for Modin Frame.
"""
if column_splits < 1:
raise ValueError(
"The value of columns_splits must be greater than or equal to 1."
)
# step cannot be less than 1
step = max(partitions.shape[0] // column_splits, 1)
preprocessed_map_func = cls.preprocess_func(map_func)
kw = {
"num_splits": step,
}
result = np.empty(partitions.shape, dtype=object)
for i in range(
0,
partitions.shape[0],
step,
):
joined_column_partitions = cls.column_partitions(partitions[i : i + step])
for j in range(partitions.shape[1]):
result[i : i + step, j] = joined_column_partitions[j].apply(
preprocessed_map_func,
*map_func_args if map_func_args is not None else (),
**kw,
**map_func_kwargs if map_func_kwargs is not None else {},
)

return result

@classmethod
def concat(cls, axis, left_parts, right_parts):
"""
Expand Down
94 changes: 93 additions & 1 deletion modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
import pytest

import modin.pandas as pd
from modin.config import Engine, MinPartitionSize, NPartitions, RangePartitioning
from modin.config import (
CpuCount,
Engine,
MinPartitionSize,
NPartitions,
RangePartitioning,
context,
)
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from modin.core.dataframe.pandas.dataframe.utils import ColumnInfo, ShuffleSortFunctions
from modin.core.dataframe.pandas.metadata import (
Expand Down Expand Up @@ -2612,3 +2619,88 @@ def remote_func():
+ materialize(deploy(get_capturing_func(2)))
== 3
)


@pytest.mark.parametrize(
"partitioning_scheme,expected_map_approach",
[
pytest.param(
lambda df: {
"row_lengths": [df.shape[0] // CpuCount.get()] * CpuCount.get(),
"column_widths": [df.shape[1]],
},
"map_partitions",
id="one_column_partition",
),
pytest.param(
lambda df: {
"row_lengths": [df.shape[0] // (CpuCount.get() * 2)]
* (CpuCount.get() * 2),
"column_widths": [df.shape[1]],
},
"map_partitions_joined_by_column",
id="very_long_column_partition",
),
pytest.param(
lambda df: {
"row_lengths": [df.shape[0] // CpuCount.get()] * CpuCount.get(),
"column_widths": [df.shape[1] // CpuCount.get()] * CpuCount.get(),
},
"map_axis_partitions",
id="perfect_partitioning",
),
],
)
def test_map_approaches(partitioning_scheme, expected_map_approach):
data_size = MinPartitionSize.get() * CpuCount.get()
data = {f"col{i}": np.ones(data_size) for i in range(data_size)}
df = pandas.DataFrame(data)

modin_df = construct_modin_df_by_scheme(df, partitioning_scheme(df))
partition_mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls

with mock.patch.object(
partition_mgr_cls,
expected_map_approach,
wraps=getattr(partition_mgr_cls, expected_map_approach),
) as expected_method:
try_cast_to_pandas(modin_df.map(lambda x: x * 2))
expected_method.assert_called()


def test_map_partitions_joined_by_column():
with context(NPartitions=CpuCount.get() * 2):
ncols = MinPartitionSize.get()
nrows = MinPartitionSize.get() * CpuCount.get() * 2
data = {f"col{i}": np.ones(nrows) for i in range(ncols)}
df = pd.DataFrame(data)
partitions = df._query_compiler._modin_frame._partitions
partition_mgr_cls = df._query_compiler._modin_frame._partition_mgr_cls

def map_func(df, first_arg, extra_arg=0):
return df.map(lambda x: (x * first_arg) + extra_arg)

column_splits = 2
map_func_args = (2,)
map_func_kwargs = {"extra_arg": 1}

# this approach doesn't work if column_splits == 0
with pytest.raises(ValueError):
partition_mgr_cls.map_partitions_joined_by_column(
partitions, 0, map_func, map_func_args, map_func_kwargs
)

result_partitions = partition_mgr_cls.map_partitions_joined_by_column(
partitions,
column_splits,
map_func,
map_func_args,
map_func_kwargs,
)
assert (
result_partitions.shape == partitions.shape
), "The result has a different split than the original."
for i in range(result_partitions.shape[0]):
assert np.all(
result_partitions[i][0].to_numpy() == 3
), "Invalid map function result."

0 comments on commit f8bf5b4

Please sign in to comment.