Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7090: Add range-partitioning implementation for '.unique()' and '.drop_duplicates()' #7091

Merged
merged 12 commits into from
Mar 19, 2024
2 changes: 0 additions & 2 deletions .github/actions/run-core-tests/group_2/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ runs:
modin/pandas/test/dataframe/test_pickle.py
echo "::endgroup::"
shell: bash -l {0}
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
shell: bash -l {0}
Comment on lines -23 to -24
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing of range-partitioning implementations is now performed in a separate action

6 changes: 5 additions & 1 deletion .github/actions/run-core-tests/group_3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ runs:
echo "::endgroup::"
shell: bash -l {0}
- run: |
echo "::group::Running experimental groupby tests (group 3)..."
echo "::group::Running range-partitioning tests (group 3)..."
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_unique or drop_duplicates"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we already deprecate MODIN_RANGE_PARTITIONING_GROUPBY?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a non-trivial process, so #7105

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone uses the old config, a deprecation message should be enough to indicate usage of the new config.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to replace the old variable everywhere in our code and make sure that the config values are always synced:

cfg.ExperimentalGroupby == cfg.RangePartitioningGroupby == cfg.RangePartitioning

the last part is a non-trivial one, so I would make the deprecation in a separate PR

MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_general.py -k "test_unique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_map_metadata.py -k "drop_duplicates"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_join_sort.py -k "merge"
echo "::endgroup::"
shell: bash -l {0}
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ jobs:
- run: python -m pytest modin/pandas/test/dataframe/test_binary.py
- run: python -m pytest modin/pandas/test/dataframe/test_reduce.py
- run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
- run: python -m pytest modin/pandas/test/test_general.py
- run: python -m pytest modin/pandas/test/dataframe/test_indexing.py
- run: python -m pytest modin/pandas/test/test_series.py
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/push-to-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ jobs:
python -m pytest modin/pandas/test/dataframe/test_indexing.py
python -m pytest modin/pandas/test/dataframe/test_iter.py
python -m pytest modin/pandas/test/dataframe/test_join_sort.py
MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
python -m pytest modin/pandas/test/dataframe/test_map_metadata.py
python -m pytest modin/pandas/test/dataframe/test_reduce.py
python -m pytest modin/pandas/test/dataframe/test_udf.py
Expand Down
6 changes: 6 additions & 0 deletions docs/flow/modin/experimental/range_partitioning_groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ Range-partitioning Merge

It is recommended to use this implementation if the right dataframe in merge is as big as
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.

'.unique()' and '.drop_duplicates()'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we refactor this doc page for 0.29.0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and there is an issue for that #6987

""""""""""""""""""""""""""""""""""""

Range-partitioning implementation of '.unique()'/'.drop_duplicates()' works best when the input data size is big (more than
5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).
Comment on lines +85 to +86
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not very descriptive, so as a part of #6987 I'm also planing to include perf measurements that I've made for range-partitioning PRs in the docs

25 changes: 17 additions & 8 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1768,24 +1768,33 @@ def to_timedelta(self, unit="ns", errors="raise"): # noqa: PR02
self, unit=unit, errors=errors
)

# FIXME: get rid of `**kwargs` parameter (Modin issue #3108).
@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.unique")
def unique(self, **kwargs):
@doc_utils.add_refer_to("Series.drop_duplicates")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@doc_utils.add_refer_to("Series.drop_duplicates")
@doc_utils.add_refer_to("Series.unique")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was done on purpose, the qc.unique() method now uses most of the arguments from Series.drop_duplicates() so it's makes sense to refer to this method instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to have a stable QC API, it seems that having this kind of mismatch is not a good idea. I would also like to have different QCs. One is responsible for d2p, second is as a base class for descendants. BaseQC does both the things now, which is not pretty good I think. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to have a stable QC API, it seems that having this kind of mismatch is not a good idea.

So I can rename qc.unique -> qc.drop_duplicates, would that be better?

I would also like to have different QCs. One is responsible for d2p, second is as a base class for descendants.

Looks like unnecessary overcomplication to me. I don't see a problem in making the base class to implement its methods as d2p. I believe all the backends would inherit from d2p qc and ignore the abstract one, what's the point then in having and supporting both of them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can rename qc.unique -> qc.drop_duplicates, would that be better?

It seems we already have drop_duplicates in the base qc. Maybe just put a comment for now on why we refer to drop_duplicates docs?

Looks like unnecessary overcomplication to me. I don't see a problem in making the base class to implement its methods as d2p. I believe all the backends would inherit from d2p qc and ignore the abstract one, what's the point then in having and supporting both of them?

I am talking about d2p and base qc, which is not an abstract but rather common functionality for all storage formats. Would it make sense? It looks like the base qc does double work at the moment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we already have drop_duplicates in the base qc. Maybe just put a comment for now on why we refer to drop_duplicates docs?

No, there's no drop_duplicates() method on the qc level, only .duplicated(). Will put a comment though on why we're referring to drop duplicates

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about d2p and base qc, which is not an abstract but rather common functionality for all storage formats. Would it make sense? It looks like the base qc does double work at the moment.

I don't know, I don't see why that would be a problem and why we should support two QCs for that matter

def unique(self, keep="first", ignore_index=True, subset=None):
"""
Get unique values of `self`.
Get unique rows of `self`.

Parameters
----------
**kwargs : dict
Serves compatibility purpose. Does not affect the result.
keep : {"first", "last", False}, default: "first"
Which duplicates to keep.
ignore_index : bool, default: True
If ``True``, the resulting axis will be labeled ``0, 1, …, n - 1``.
subset : list, optional
Only consider certain columns for identifying duplicates, if `None`, use all of the columns.

Returns
-------
BaseQueryCompiler
New QueryCompiler with unique values.
"""
return SeriesDefault.register(pandas.Series.unique)(self, **kwargs)
if subset is not None:
mask = self.getitem_column_array(subset, ignore_order=True)
else:
mask = self
without_duplicates = self.getitem_array(mask.duplicated(keep=keep).invert())
if ignore_index:
without_duplicates = without_duplicates.reset_index(drop=True)
return without_duplicates

@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.searchsorted")
Expand Down
35 changes: 29 additions & 6 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1900,13 +1900,36 @@ def str_split(self, pat=None, n=-1, expand=False, regex=None):

# END String map partitions operations

def unique(self):
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
def unique(self, keep="first", ignore_index=True, subset=None):
# kernels with 'pandas.Series.unique()' work faster
can_use_unique_kernel = (
subset is None and ignore_index and len(self.columns) == 1 and keep
)
return self.__constructor__(new_modin_frame)

if not can_use_unique_kernel and not RangePartitioning.get():
return super().unique(keep=keep, ignore_index=ignore_index, subset=subset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this branch for d2p?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this branch is for general unique() implementation that uses QC's public API


if RangePartitioning.get():
new_modin_frame = self._modin_frame._apply_func_to_range_partitioning(
key_columns=self.columns.tolist() if subset is None else subset,
func=(
(lambda df: pandas.DataFrame(df.squeeze(axis=1).unique()))
if can_use_unique_kernel
else (
lambda df: df.drop_duplicates(
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
keep=keep, ignore_index=ignore_index, subset=subset
)
)
),
preserve_columns=True,
)
else:
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
)
return self.__constructor__(new_modin_frame, shape_hint=self._shape_hint)

def searchsorted(self, **kwargs):
def searchsorted(df):
Expand Down
13 changes: 6 additions & 7 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,13 +1511,12 @@ def drop_duplicates(
subset = list(subset)
else:
subset = [subset]
df = self[subset]
else:
df = self
duplicated = df.duplicated(keep=keep)
result = self[~duplicated]
if ignore_index:
result.index = pandas.RangeIndex(stop=len(result))
if len(diff := pandas.Index(subset).difference(self.columns)) > 0:
raise KeyError(diff)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pandas raise the same error with this message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it raises exactly the same error:

>>> pd_df
   a  b
0  1  2
>>> pd_df.drop_duplicates(subset=["b", "c"])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "python3.9/site-packages/pandas/core/frame.py", line 6805, in drop_duplicates
    result = self[-self.duplicated(subset, keep=keep)]
  File "python3.9/site-packages/pandas/core/frame.py", line 6937, in duplicated
    raise KeyError(Index(diff))
KeyError: Index(['c'], dtype='object')

result_qc = self._query_compiler.unique(
keep=keep, ignore_index=ignore_index, subset=subset
)
result = self.__constructor__(query_compiler=result_qc)
if inplace:
self._update_inplace(result._query_compiler)
else:
Expand Down
27 changes: 12 additions & 15 deletions modin/pandas/test/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
generate_multiindex,
random_state,
rotate_decimal_digits_or_symbols,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -247,10 +248,6 @@ def test_join_6602():
],
)
def test_merge(test_data, test_data2):
# RangePartitioning merge always produces sorted result, so we have to sort
# pandas' result as well in order them to match
comparator = df_equals_and_sort if RangePartitioning.get() else df_equals

modin_df = pd.DataFrame(
test_data,
columns=["col{}".format(i) for i in range(test_data.shape[1])],
Expand Down Expand Up @@ -283,7 +280,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

modin_result = modin_df.merge(
modin_df2,
Expand All @@ -299,7 +296,7 @@ def test_merge(test_data, test_data2):
right_on="key",
sort=sorts[j],
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Test for issue #1771
modin_df = pd.DataFrame({"name": np.arange(40)})
Expand All @@ -308,7 +305,7 @@ def test_merge(test_data, test_data2):
pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]})
modin_result = modin_df.merge(modin_df2, on="name", how="inner")
pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner")
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

frame_data = {
"col1": [0, 1, 2, 3],
Expand All @@ -329,7 +326,7 @@ def test_merge(test_data, test_data2):
# Defaults
modin_result = modin_df.merge(modin_df2, how=how)
pandas_result = pandas_df.merge(pandas_df2, how=how)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_index
modin_result = modin_df.merge(
Expand All @@ -338,7 +335,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_on
modin_result = modin_df.merge(
Expand All @@ -347,7 +344,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col1
modin_result = modin_df.merge(
Expand All @@ -356,7 +353,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col2
modin_result = modin_df.merge(
Expand All @@ -365,7 +362,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col2", right_on="col2"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_index
modin_result = modin_df.merge(
Expand All @@ -374,7 +371,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Cannot merge a Series without a name
ps = pandas.Series(frame_data2.get("col1"))
Expand All @@ -383,7 +380,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
expected_exception=ValueError("Cannot merge a Series without a name"),
)

Expand All @@ -394,7 +391,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
)

with pytest.raises(TypeError):
Expand Down
20 changes: 13 additions & 7 deletions modin/pandas/test/dataframe/test_map_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
name_contains,
numeric_dfs,
random_state,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -1068,7 +1069,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
)
else:
df_equals(
sort_if_range_partitioning(
pandas_df.drop_duplicates(
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
),
Expand All @@ -1078,7 +1079,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
)

try:
pandas_results = pandas_df.drop_duplicates(
pandas_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
except Exception as err:
Expand All @@ -1087,10 +1088,10 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
else:
modin_results = modin_df.drop_duplicates(
modin_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
df_equals(modin_results, pandas_results)
sort_if_range_partitioning(modin_df, pandas_df)


def test_drop_duplicates_with_missing_index_values():
Expand Down Expand Up @@ -1168,7 +1169,7 @@ def test_drop_duplicates_with_missing_index_values():
modin_df = pd.DataFrame(data["data"], index=data["index"], columns=data["columns"])
modin_result = modin_df.sort_values(["id", "time"]).drop_duplicates(["id"])
pandas_result = pandas_df.sort_values(["id", "time"]).drop_duplicates(["id"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_after_sort():
Expand All @@ -1183,15 +1184,20 @@ def test_drop_duplicates_after_sort():

modin_result = modin_df.sort_values(["value", "time"]).drop_duplicates(["value"])
pandas_result = pandas_df.sort_values(["value", "time"]).drop_duplicates(["value"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_with_repeated_index_values():
# This tests for issue #4467: https://github.com/modin-project/modin/issues/4467
data = [[0], [1], [0]]
index = [0, 0, 0]
modin_df, pandas_df = create_test_dfs(data, index=index)
eval_general(modin_df, pandas_df, lambda df: df.drop_duplicates())
eval_general(
modin_df,
pandas_df,
lambda df: df.drop_duplicates(),
comparator=sort_if_range_partitioning,
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down