Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5394: Reduce amount of remote calls for Map operator #7136

Merged
merged 20 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pandas.core.dtypes.common import is_dtype_equal, is_list_like, is_numeric_dtype
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.config import CpuCount, Engine, IsRayCluster, MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType, is_trivial_index
from modin.core.dataframe.pandas.dataframe.utils import (
Expand Down Expand Up @@ -2202,12 +2202,47 @@
PandasDataframe
A new dataframe.
"""
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
if self.num_parts <= 1.5 * CpuCount.get():
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
# block-wise map
map_fn = (
self._partition_mgr_cls.lazy_map_partitions
if lazy
else self._partition_mgr_cls.map_partitions
)
new_partitions = map_fn(self._partitions, func, func_args, func_kwargs)
else:
# axis-wise map
# we choose an axis for a combination of partitions
# whose size is closer to the number of CPUs
if abs(self._partitions.shape[0] - CpuCount.get()) < abs(
self._partitions.shape[1] - CpuCount.get()
):
axis = 1
else:
axis = 0

column_splits = CpuCount.get() // self._partitions.shape[1]

Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
if axis == 0 and column_splits > 1:
# splitting by parts of columnar partitions
new_partitions = (
self._partition_mgr_cls.map_partitions_joined_by_column(
self._partitions, column_splits, func, func_args, func_kwargs
)

Check warning on line 2231 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L2231

Added line #L2231 was not covered by tests
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
)
else:
# splitting by full axis partitions
new_partitions = self._partition_mgr_cls.map_axis_partitions(
axis,
self._partitions,
lambda df: func(
df,
*(func_args if func_args is not None else ()),
**(func_kwargs if func_kwargs is not None else {}),
),
keep_partitioning=True,
)

if new_columns is not None and self.has_materialized_columns:
assert len(new_columns) == len(
self.columns
Expand Down
71 changes: 71 additions & 0 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,77 @@
**kwargs,
)

@classmethod
def map_partitions_joined_by_column(
cls,
partitions,
column_splits,
map_func,
map_func_args=None,
map_func_kwargs=None,
):
"""
Combine several blocks by column into one virtual partition and apply "map_func" to them.

Parameters
----------
partitions : NumPy 2D array
Partitions of Modin Frame.
column_splits : int
The number of splits by column.
map_func : callable
Function to apply.
map_func_args : iterable, optional
Positional arguments for the 'map_func'.
map_func_kwargs : dict, optional
Keyword arguments for the 'map_func'.

Returns
-------
NumPy array
An array of new partitions for Modin Frame.
"""
if column_splits < 1:
raise ValueError(
"The value of columns_splits must be greater than or equal to 1."

Check warning on line 780 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L779-L780

Added lines #L779 - L780 were not covered by tests
)
# step cannot be less than 1
step = max(partitions.shape[0] // column_splits, 1)
new_partitions = np.array(
[

Check warning on line 785 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L784-L785

Added lines #L784 - L785 were not covered by tests
cls.column_partitions(
partitions[i : i + step],
# full_axis=False,
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
)
for i in range(
0,
partitions.shape[0],
step,
)
]
)
preprocessed_map_func = cls.preprocess_func(map_func)
kw = {
"num_splits": step,

Check warning on line 799 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L798-L799

Added lines #L798 - L799 were not covered by tests
}
return np.concatenate(
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
[

Check warning on line 802 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L802

Added line #L802 was not covered by tests
np.stack(
[
part.apply(
preprocessed_map_func,
*map_func_args if map_func_args is not None else (),
**kw,
**map_func_kwargs if map_func_kwargs is not None else {},
)
for part in row_parts
],
axis=-1,
)
for row_parts in new_partitions
]
)

@classmethod
def concat(cls, axis, left_parts, right_parts):
"""
Expand Down
42 changes: 40 additions & 2 deletions modin/tests/pandas/dataframe/test_map_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import pytest

import modin.pandas as pd
from modin.config import MinPartitionSize, NPartitions, StorageFormat
from modin.config import CpuCount, MinPartitionSize, NPartitions, StorageFormat
from modin.core.dataframe.pandas.metadata import LazyProxyCategoricalDtype
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.pandas.testing import assert_index_equal, assert_series_equal
Expand Down Expand Up @@ -223,13 +223,51 @@ def test_add_suffix(data, axis):
df_equals(new_modin_df.columns, new_pandas_df.columns)


map_strtagies = ["map", "axis_map", "splitted_axis_map"]
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize("testfunc", test_func_values, ids=test_func_keys)
@pytest.mark.parametrize(
"na_action", [None, "ignore"], ids=["no_na_action", "ignore_na"]
)
def test_applymap(data, testfunc, na_action):
@pytest.mark.parametrize("map_strtagy", map_strtagies, ids=map_strtagies)
def test_applymap(data, testfunc, na_action, map_strtagy):
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
keys = list(data.keys())
epected_shape = None
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
if map_strtagy == "map":
epected_shape = (1, 1)
max_size = max(len(keys), len(data[keys[0]]))
MinPartitionSize.put(max_size)
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
elif map_strtagy == "axis_map":
epected_shape = (CpuCount.get(), CpuCount.get())
min_size = min(len(keys), len(data[keys[0]]))
required_size = min_size // CpuCount.get()
if required_size > 0:
MinPartitionSize.put(required_size)
data = {k: v[:min_size] for k, v in data.items() if k in keys[:min_size]}
else:
pytest.skip(
"The stratagy cannot be tested with the currect data if required_size less than 1"
)
elif map_strtagy == "splitted_axis_map":
epected_shape = (2 * CpuCount.get(), 1)
required_size = len(data[keys[0]]) // (CpuCount.get() * 2)
# the stratagy cannot be tested with the currect data if required_size less than 1
if required_size > 0:
MinPartitionSize.put(required_size)
data = {k: v for k, v in data.items() if k in keys[:required_size]}
else:
pytest.skip(
"The stratagy cannot be tested with the currect data if required_size less than 1"
)
else:
raise ValueError("Incorrect map_strtagy")

modin_df, pandas_df = create_test_dfs(data)
assert (
modin_df._query_compiler._modin_frame._partitions.shape == epected_shape
Fixed Show fixed Hide fixed
), "Incorrect shape of partitions, please check data preparating."

with pytest.raises(ValueError):
x = 2
Expand Down
Loading