Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5394: Reduce amount of remote calls for Map operator #7136

Merged
merged 20 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2202,12 +2202,47 @@ def map(
PandasDataframe
A new dataframe.
"""
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
if self.num_parts <= 1.5 * CpuCount.get():
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
# block-wise map
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
# axis-wise map
# we choose an axis for a combination of partitions
# whose size is closer to the number of CPUs
if abs(self._partitions.shape[0] - CpuCount.get()) < abs(
self._partitions.shape[1] - CpuCount.get()
):
axis = 1
else:
axis = 0

column_splits = CpuCount.get() // self._partitions.shape[1]

Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = (
self._partition_mgr_cls.map_partitions_joined_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
)
else:
# splitting by full axis partitions
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
lambda df: func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)

if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
self.columns
Expand Down
57 changes: 57 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,63 @@ def map_axis_partitions(
**kwargs,
)

@classmethod
def map_partitions_joined_by_column(
cls,
partitions,
column_splits,
map_func,
map_func_args=None,
map_func_kwargs=None,
):
"""
Combine several blocks by column into one virtual partition and apply "map_func" to them.

Parameters
----------
partitions : NumPy 2D array
Partitions of Modin Frame.
column_splits : int
The number of splits by column.
map_func : callable
Function to apply.
map_func_args : iterable, optional
Positional arguments for the 'map_func'.
map_func_kwargs : dict, optional
Keyword arguments for the 'map_func'.

Returns
-------
NumPy array
An array of new partitions for Modin Frame.
"""
if column_splits < 1:
raise ValueError(
"The value of columns_splits must be greater than or equal to 1."
)
# step cannot be less than 1
step = max(partitions.shape[0] // column_splits, 1)
preprocessed_map_func = cls.preprocess_func(map_func)
kw = {
"num_splits": step,
}
result = np.empty(partitions.shape, dtype=cls._partition_class)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are equivalent actions, but let's make it more explicit.

Suggested change
result = np.empty(partitions.shape, dtype=cls._partition_class)
result = np.empty(partitions.shape, dtype=object)

for i in range(
0,
partitions.shape[0],
step,
):
joined_column_partitions = cls.column_partitions(partitions[i : i + step])
for j in range(partitions.shape[1]):
result[i : i + step, j] = joined_column_partitions[j].apply(
preprocessed_map_func,
*map_func_args if map_func_args is not None else (),
**kw,
**map_func_kwargs if map_func_kwargs is not None else {},
)

return result

@classmethod
def concat(cls, axis, left_parts, right_parts):
"""
Expand Down
102 changes: 101 additions & 1 deletion modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
import pytest

import modin.pandas as pd
from modin.config import Engine, MinPartitionSize, NPartitions, RangePartitioning
from modin.config import (
CpuCount,
Engine,
MinPartitionSize,
NPartitions,
RangePartitioning,
context,
)
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from modin.core.dataframe.pandas.dataframe.utils import ColumnInfo, ShuffleSortFunctions
from modin.core.dataframe.pandas.metadata import (
Expand Down Expand Up @@ -49,12 +56,14 @@
from modin.core.execution.ray.implementations.pandas_on_ray.partitioning import (
PandasOnRayDataframeColumnPartition,
PandasOnRayDataframePartition,
PandasOnRayDataframePartitionManager,
PandasOnRayDataframeRowPartition,
)

block_partition_class = PandasOnRayDataframePartition
virtual_column_partition_class = PandasOnRayDataframeColumnPartition
virtual_row_partition_class = PandasOnRayDataframeRowPartition
partition_manager_class = PandasOnRayDataframePartitionManager
put = RayWrapper.put
deploy = RayWrapper.deploy
materialize = RayWrapper.materialize
Expand All @@ -63,6 +72,7 @@
from modin.core.execution.dask.implementations.pandas_on_dask.partitioning import (
PandasOnDaskDataframeColumnPartition,
PandasOnDaskDataframePartition,
PandasOnDaskDataframePartitionManager,
PandasOnDaskDataframeRowPartition,
)

Expand All @@ -75,25 +85,29 @@ def put(x):
block_partition_class = PandasOnDaskDataframePartition
virtual_column_partition_class = PandasOnDaskDataframeColumnPartition
virtual_row_partition_class = PandasOnDaskDataframeRowPartition
partition_manager_class = PandasOnDaskDataframePartitionManager
deploy = DaskWrapper.deploy
materialize = DaskWrapper.materialize
elif Engine.get() == "Unidist":
from modin.core.execution.unidist.common import UnidistWrapper
from modin.core.execution.unidist.implementations.pandas_on_unidist.partitioning import (
PandasOnUnidistDataframeColumnPartition,
PandasOnUnidistDataframePartition,
PandasOnUnidistDataframePartitionManager,
PandasOnUnidistDataframeRowPartition,
)

block_partition_class = PandasOnUnidistDataframePartition
virtual_column_partition_class = PandasOnUnidistDataframeColumnPartition
virtual_row_partition_class = PandasOnUnidistDataframeRowPartition
partition_manager_class = PandasOnUnidistDataframePartitionManager
put = UnidistWrapper.put
elif Engine.get() == "Python":
from modin.core.execution.python.common import PythonWrapper
from modin.core.execution.python.implementations.pandas_on_python.partitioning import (
PandasOnPythonDataframeColumnPartition,
PandasOnPythonDataframePartition,
PandasOnPythonDataframePartitionManager,
PandasOnPythonDataframeRowPartition,
)

Expand All @@ -109,6 +123,7 @@ def materialize(arg):
block_partition_class = PandasOnPythonDataframePartition
virtual_column_partition_class = PandasOnPythonDataframeColumnPartition
virtual_row_partition_class = PandasOnPythonDataframeRowPartition
partition_manager_class = PandasOnPythonDataframePartitionManager
else:
raise NotImplementedError(
f"These test suites are not implemented for the '{Engine.get()}' engine"
Expand Down Expand Up @@ -2577,3 +2592,88 @@ def remote_func():
+ materialize(deploy(get_capturing_func(2)))
== 3
)


@pytest.mark.parametrize(
"partitioning_scheme,expected_map_approach",
[
pytest.param(
lambda df: {
"row_lengths": [df.shape[0] // CpuCount.get()] * CpuCount.get(),
"column_widths": [df.shape[1]],
},
"map_partitions",
id="one_column_partition",
),
pytest.param(
lambda df: {
"row_lengths": [df.shape[0] // (CpuCount.get() * 2)]
* (CpuCount.get() * 2),
"column_widths": [df.shape[1]],
},
"map_partitions_joined_by_column",
id="very_long_column_partition",
),
pytest.param(
lambda df: {
"row_lengths": [df.shape[0] // CpuCount.get()] * CpuCount.get(),
"column_widths": [df.shape[1] // CpuCount.get()] * CpuCount.get(),
},
"map_axis_partitions",
id="perfect_partitioning",
),
],
)
def test_map_approaches(partitioning_scheme, expected_map_approach):
data_size = MinPartitionSize.get() * CpuCount.get()
data = {f"col{i}": np.ones(data_size) for i in range(data_size)}
df = pandas.DataFrame(data)

modin_df = construct_modin_df_by_scheme(df, partitioning_scheme(df))
partition_mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls

with mock.patch.object(
partition_mgr_cls,
expected_map_approach,
wraps=getattr(partition_mgr_cls, expected_map_approach),
) as expected_method:
try_cast_to_pandas(modin_df.map(lambda x: x * 2))
expected_method.assert_called()


def test_map_partitions_joined_by_column():
# Set the config to 'True' inside of the context-manager
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean?

with context(NPartitions=CpuCount.get() * 2):
ncols = MinPartitionSize.get()
nrows = MinPartitionSize.get() * CpuCount.get() * 2
data = {f"col{i}": np.ones(nrows) for i in range(ncols)}
df = pd.DataFrame(data)
partitions = df._query_compiler._modin_frame._partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of partition_manager_class?

Suggested change
partitions = df._query_compiler._modin_frame._partitions
partitions = df._query_compiler._modin_frame._partitions
partition_mgr_cls = df._query_compiler._modin_frame._partition_mgr_cls


def map_func(df, first_arg, extra_arg=0):
return df.map(lambda x: (x * first_arg) + extra_arg)

column_splits = 2
map_func_args = (2,)
map_func_kwargs = {"extra_arg": 1}

# this approach doesn't work if column_splits == 0
with pytest.raises(ValueError):
partition_manager_class.map_partitions_joined_by_column(
partitions, 0, map_func, map_func_args, map_func_kwargs
)

result_partitions = partition_manager_class.map_partitions_joined_by_column(
partitions,
column_splits,
map_func,
map_func_args,
map_func_kwargs,
)
assert (
result_partitions.shape == partitions.shape
), "The result has a different split than the original."
for i in range(result_partitions.shape[0]):
assert np.all(
result_partitions[i][0].to_numpy() == 3
), "Invalid map function result."
Loading