Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6965: Implement '.merge()' using range-partitioning implementation #6966

Merged
merged 8 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/actions/run-core-tests/group_2/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ runs:
modin/pandas/test/dataframe/test_pickle.py
echo "::endgroup::"
shell: bash -l {0}
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
shell: bash -l {0}
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ jobs:
- run: python -m pytest modin/pandas/test/dataframe/test_binary.py
- run: python -m pytest modin/pandas/test/dataframe/test_reduce.py
- run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
- run: python -m pytest modin/pandas/test/test_general.py
- run: python -m pytest modin/pandas/test/dataframe/test_indexing.py
- run: python -m pytest modin/pandas/test/test_series.py
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/push-to-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
python -m pytest modin/pandas/test/dataframe/test_indexing.py
python -m pytest modin/pandas/test/dataframe/test_iter.py
python -m pytest modin/pandas/test/dataframe/test_join_sort.py
MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
python -m pytest modin/pandas/test/dataframe/test_map_metadata.py
python -m pytest modin/pandas/test/dataframe/test_reduce.py
python -m pytest modin/pandas/test/dataframe/test_udf.py
Expand Down
2 changes: 1 addition & 1 deletion docs/flow/modin/experimental/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ and provides a limited set of functionality:
* :doc:`xgboost <xgboost>`
* :doc:`sklearn <sklearn>`
* :doc:`batch <batch>`
* :doc:`Range-partitioning GroupBy implementation <range_partitioning_groupby>`
* :doc:`Range-partitioning implementations <range_partitioning_groupby>`
anmyachev marked this conversation as resolved.
Show resolved Hide resolved


.. toctree::
Expand Down
6 changes: 6 additions & 0 deletions docs/flow/modin/experimental/range_partitioning_groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,9 @@ implementation with the respective warning if it meets an unsupported case:
... # Range-partitioning groupby is only supported when grouping on a column(s) of the same frame.
... # https://github.com/modin-project/modin/issues/5926
... # Falling back to a TreeReduce implementation.

Range-partitioning Merge
""""""""""""""""""""""""

It is recommended to use this implementation if the right dataframe in merge is as big as
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.
2 changes: 2 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
NPartitions,
PersistentPickle,
ProgressBar,
RangePartitioning,
RangePartitioningGroupby,
RayRedisAddress,
RayRedisPassword,
Expand Down Expand Up @@ -92,6 +93,7 @@
"ModinNumpy",
"ExperimentalNumPyAPI",
"RangePartitioningGroupby",
"RangePartitioning",
"ExperimentalGroupbyImpl",
"AsyncReadMode",
"ReadSqlEngine",
Expand Down
12 changes: 12 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,18 @@ def _sibling(cls) -> type[EnvWithSibilings]:
)


class RangePartitioning(EnvironmentVariable, type=bool):
"""
Set to true to use Modin's range-partitioning implementation where possible.

Please refer to documentation for cases where enabling this options would be beneficial:
https://modin.readthedocs.io/en/stable/flow/modin/experimental/range_partitioning_groupby.html
"""

varname = "MODIN_RANGE_PARTITIONING"
default = False


class CIAWSSecretAccessKey(EnvironmentVariable, type=str):
"""Set to AWS_SECRET_ACCESS_KEY when running mock S3 tests for Modin in GitHub CI."""

Expand Down
68 changes: 68 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,74 @@
new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes
)

def _apply_func_to_range_partitioning_broadcast(
self, right, func, key, new_index=None, new_columns=None, new_dtypes=None
):
"""
Apply `func` against two dataframes using range-partitioning implementation.

The method first builds range-partitioning for both dataframes using the data from
`self[key]`, after that, it applies `func` row-wise to `self` frame and
broadcasts row-parts of `right` to `self`.

Parameters
----------
right : PandasDataframe
func : callable(left : pandas.DataFrame, right : pandas.DataFrame) -> pandas.DataFrame
key : list of labels
Columns to use to build range-partitioning. Must present in both dataframes.
new_index : pandas.Index, optional
Index values to write to the result's cache.
new_columns : pandas.Index, optional
Column values to write to the result's cache.
new_dtypes : pandas.Series or ModinDtypes, optional
Dtype values to write to the result's cache.

Returns
-------
PandasDataframe
"""
if self._partitions.shape[0] == 1:
result = self.broadcast_apply_full_axis(
axis=1,
func=func,
new_columns=new_columns,
dtypes=new_dtypes,
other=right,
)
return result

if not isinstance(key, list):
key = [key]

Check warning on line 3900 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L3900

Added line #L3900 was not covered by tests

shuffling_functions = ShuffleSortFunctions(
self,
key,
ascending=True,
ideal_num_new_partitions=self._partitions.shape[0],
)

# here we want to get indices of those partitions that hold the key columns
key_indices = self.columns.get_indexer_for(key)
partition_indices = np.unique(
np.digitize(key_indices, np.cumsum(self.column_widths))
)

new_partitions = self._partition_mgr_cls.shuffle_partitions(
self._partitions,
partition_indices,
shuffling_functions,
func,
right_partitions=right._partitions,
)

return self.__constructor__(
new_partitions,
index=new_index,
columns=new_columns,
dtypes=new_dtypes,
)

@lazy_metadata_decorator(apply_axis="both")
def groupby(
self,
Expand Down
57 changes: 50 additions & 7 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,7 @@
index,
shuffle_functions: "ShuffleFunctions",
final_shuffle_func,
right_partitions=None,
):
"""
Return shuffled partitions.
Expand All @@ -1736,6 +1737,9 @@
An object implementing the functions that we will be using to perform this shuffle.
final_shuffle_func : Callable(pandas.DataFrame) -> pandas.DataFrame
Function that shuffles the data within each new partition.
right_partitions : np.ndarray, optional
Partitions to broadcast to `self` partitions. If specified, the method builds range-partitioning
for `right_partitions` basing on bins calculated for `partitions`, then performs broadcasting.

Returns
-------
Expand Down Expand Up @@ -1774,18 +1778,57 @@
for partition in row_partitions
]
).T
# We need to convert every partition that came from the splits into a full-axis column partition.
new_partitions = [

if right_partitions is None:
# We need to convert every partition that came from the splits into a column partition.
return np.array(
[
[
cls._column_partitions_class(
row_partition, full_axis=False
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
).apply(final_shuffle_func)
]
for row_partition in split_row_partitions
]
)

right_row_parts = cls.row_partitions(right_partitions)
right_split_row_partitions = np.array(
[
partition.split(
shuffle_functions.split_fn,
num_splits=num_bins,
extract_metadata=False,
)
for partition in right_row_parts
]
).T
return np.array(
[
cls._column_partitions_class(row_partition, full_axis=False).apply(
final_shuffle_func
final_shuffle_func,
other_axis_partition=cls._column_partitions_class(
right_row_partitions
),
)
for right_row_partitions, row_partition in zip(
right_split_row_partitions, split_row_partitions
)
]
for row_partition in split_row_partitions
]
return np.array(new_partitions)
)

else:
# If there are not pivots we can simply apply the function row-wise
if right_partitions is None:
return np.array(
[row_part.apply(final_shuffle_func) for row_part in row_partitions]
)
right_row_parts = cls.row_partitions(right_partitions)

Check warning on line 1826 in modin/core/dataframe/pandas/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/partition_manager.py#L1826

Added line #L1826 was not covered by tests
return np.array(
[row_part.apply(final_shuffle_func) for row_part in row_partitions]
[
row_part.apply(
final_shuffle_func, other_axis_partition=right_row_part
)
for right_row_part, row_part in zip(right_row_parts, row_partitions)
]
)