Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7090: Add range-partitioning implementation for '.unique()' and '.drop_duplicates()' #7091

Merged
merged 12 commits into from
Mar 19, 2024
2 changes: 0 additions & 2 deletions .github/actions/run-core-tests/group_2/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ runs:
modin/pandas/test/dataframe/test_pickle.py
echo "::endgroup::"
shell: bash -l {0}
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
shell: bash -l {0}
Comment on lines -23 to -24
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing of range-partitioning implementations is now performed in a separate action

5 changes: 4 additions & 1 deletion .github/actions/run-core-tests/group_3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ runs:
- run: |
echo "::group::Running range-partitioning tests (group 3)..."
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_nunique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_unique or test_nunique or drop_duplicates"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_general.py -k "test_unique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_map_metadata.py -k "drop_duplicates"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_join_sort.py -k "merge"
echo "::endgroup::"
shell: bash -l {0}
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ jobs:
- run: python -m pytest modin/pandas/test/dataframe/test_binary.py
- run: python -m pytest modin/pandas/test/dataframe/test_reduce.py
- run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
- run: python -m pytest modin/pandas/test/test_general.py
- run: python -m pytest modin/pandas/test/dataframe/test_indexing.py
- run: python -m pytest modin/pandas/test/test_series.py
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/push-to-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ jobs:
python -m pytest modin/pandas/test/dataframe/test_indexing.py
python -m pytest modin/pandas/test/dataframe/test_iter.py
python -m pytest modin/pandas/test/dataframe/test_join_sort.py
MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
python -m pytest modin/pandas/test/dataframe/test_map_metadata.py
python -m pytest modin/pandas/test/dataframe/test_reduce.py
python -m pytest modin/pandas/test/dataframe/test_udf.py
Expand Down
6 changes: 6 additions & 0 deletions docs/flow/modin/experimental/range_partitioning_groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ Range-partitioning Merge
It is recommended to use this implementation if the right dataframe in merge is as big as
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.

'.unique()' and '.drop_duplicates()'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we refactor this doc page for 0.29.0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and there is an issue for that #6987

""""""""""""""""""""""""""""""""""""

Range-partitioning implementation of '.unique()'/'.drop_duplicates()' works best when the input data size is big (more than
5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).
Comment on lines +85 to +86
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not very descriptive, so as a part of #6987 I'm also planing to include perf measurements that I've made for range-partitioning PRs in the docs


'.nunique()'
""""""""""""""""""""""""""""""""""""

Expand Down
26 changes: 18 additions & 8 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1768,24 +1768,34 @@ def to_timedelta(self, unit="ns", errors="raise"): # noqa: PR02
self, unit=unit, errors=errors
)

# FIXME: get rid of `**kwargs` parameter (Modin issue #3108).
@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.unique")
def unique(self, **kwargs):
# 'qc.unique()' uses most of the arguments from 'df.drop_duplicates()', so refering to this method
@doc_utils.add_refer_to("DataFrame.drop_duplicates")
def unique(self, keep="first", ignore_index=True, subset=None):
"""
Get unique values of `self`.
Get unique rows of `self`.

Parameters
----------
**kwargs : dict
Serves compatibility purpose. Does not affect the result.
keep : {"first", "last", False}, default: "first"
Which duplicates to keep.
ignore_index : bool, default: True
If ``True``, the resulting axis will be labeled ``0, 1, …, n - 1``.
subset : list, optional
Only consider certain columns for identifying duplicates, if `None`, use all of the columns.

Returns
-------
BaseQueryCompiler
New QueryCompiler with unique values.
"""
return SeriesDefault.register(pandas.Series.unique)(self, **kwargs)
if subset is not None:
mask = self.getitem_column_array(subset, ignore_order=True)
else:
mask = self
without_duplicates = self.getitem_array(mask.duplicated(keep=keep).invert())
if ignore_index:
without_duplicates = without_duplicates.reset_index(drop=True)
return without_duplicates

@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.searchsorted")
Expand Down
35 changes: 29 additions & 6 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1933,13 +1933,36 @@ def str_split(self, pat=None, n=-1, expand=False, regex=None):

# END String map partitions operations

def unique(self):
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
def unique(self, keep="first", ignore_index=True, subset=None):
# kernels with 'pandas.Series.unique()' work faster
can_use_unique_kernel = (
subset is None and ignore_index and len(self.columns) == 1 and keep
)
return self.__constructor__(new_modin_frame)

if not can_use_unique_kernel and not RangePartitioning.get():
return super().unique(keep=keep, ignore_index=ignore_index, subset=subset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this branch for d2p?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this branch is for general unique() implementation that uses QC's public API


if RangePartitioning.get():
new_modin_frame = self._modin_frame._apply_func_to_range_partitioning(
key_columns=self.columns.tolist() if subset is None else subset,
func=(
(lambda df: pandas.DataFrame(df.squeeze(axis=1).unique()))
if can_use_unique_kernel
else (
lambda df: df.drop_duplicates(
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
keep=keep, ignore_index=ignore_index, subset=subset
)
)
),
preserve_columns=True,
)
else:
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
)
return self.__constructor__(new_modin_frame, shape_hint=self._shape_hint)

def searchsorted(self, **kwargs):
def searchsorted(df):
Expand Down
13 changes: 6 additions & 7 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,13 +1511,12 @@ def drop_duplicates(
subset = list(subset)
else:
subset = [subset]
df = self[subset]
else:
df = self
duplicated = df.duplicated(keep=keep)
result = self[~duplicated]
if ignore_index:
result.index = pandas.RangeIndex(stop=len(result))
if len(diff := pandas.Index(subset).difference(self.columns)) > 0:
raise KeyError(diff)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pandas raise the same error with this message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it raises exactly the same error:

>>> pd_df
   a  b
0  1  2
>>> pd_df.drop_duplicates(subset=["b", "c"])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "python3.9/site-packages/pandas/core/frame.py", line 6805, in drop_duplicates
    result = self[-self.duplicated(subset, keep=keep)]
  File "python3.9/site-packages/pandas/core/frame.py", line 6937, in duplicated
    raise KeyError(Index(diff))
KeyError: Index(['c'], dtype='object')

result_qc = self._query_compiler.unique(
keep=keep, ignore_index=ignore_index, subset=subset
)
result = self.__constructor__(query_compiler=result_qc)
if inplace:
self._update_inplace(result._query_compiler)
else:
Expand Down
27 changes: 12 additions & 15 deletions modin/pandas/test/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
generate_multiindex,
random_state,
rotate_decimal_digits_or_symbols,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -247,10 +248,6 @@ def test_join_6602():
],
)
def test_merge(test_data, test_data2):
# RangePartitioning merge always produces sorted result, so we have to sort
# pandas' result as well in order them to match
comparator = df_equals_and_sort if RangePartitioning.get() else df_equals

modin_df = pd.DataFrame(
test_data,
columns=["col{}".format(i) for i in range(test_data.shape[1])],
Expand Down Expand Up @@ -283,7 +280,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

modin_result = modin_df.merge(
modin_df2,
Expand All @@ -299,7 +296,7 @@ def test_merge(test_data, test_data2):
right_on="key",
sort=sorts[j],
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Test for issue #1771
modin_df = pd.DataFrame({"name": np.arange(40)})
Expand All @@ -308,7 +305,7 @@ def test_merge(test_data, test_data2):
pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]})
modin_result = modin_df.merge(modin_df2, on="name", how="inner")
pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner")
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

frame_data = {
"col1": [0, 1, 2, 3],
Expand All @@ -329,7 +326,7 @@ def test_merge(test_data, test_data2):
# Defaults
modin_result = modin_df.merge(modin_df2, how=how)
pandas_result = pandas_df.merge(pandas_df2, how=how)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_index
modin_result = modin_df.merge(
Expand All @@ -338,7 +335,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_on
modin_result = modin_df.merge(
Expand All @@ -347,7 +344,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col1
modin_result = modin_df.merge(
Expand All @@ -356,7 +353,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col2
modin_result = modin_df.merge(
Expand All @@ -365,7 +362,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col2", right_on="col2"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_index
modin_result = modin_df.merge(
Expand All @@ -374,7 +371,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Cannot merge a Series without a name
ps = pandas.Series(frame_data2.get("col1"))
Expand All @@ -383,7 +380,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
expected_exception=ValueError("Cannot merge a Series without a name"),
)

Expand All @@ -394,7 +391,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
)

with pytest.raises(TypeError):
Expand Down
20 changes: 13 additions & 7 deletions modin/pandas/test/dataframe/test_map_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
name_contains,
numeric_dfs,
random_state,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -1068,7 +1069,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
)
else:
df_equals(
sort_if_range_partitioning(
pandas_df.drop_duplicates(
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
),
Expand All @@ -1078,7 +1079,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
)

try:
pandas_results = pandas_df.drop_duplicates(
pandas_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
except Exception as err:
Expand All @@ -1087,10 +1088,10 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
else:
modin_results = modin_df.drop_duplicates(
modin_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
df_equals(modin_results, pandas_results)
sort_if_range_partitioning(modin_df, pandas_df)


def test_drop_duplicates_with_missing_index_values():
Expand Down Expand Up @@ -1168,7 +1169,7 @@ def test_drop_duplicates_with_missing_index_values():
modin_df = pd.DataFrame(data["data"], index=data["index"], columns=data["columns"])
modin_result = modin_df.sort_values(["id", "time"]).drop_duplicates(["id"])
pandas_result = pandas_df.sort_values(["id", "time"]).drop_duplicates(["id"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_after_sort():
Expand All @@ -1183,15 +1184,20 @@ def test_drop_duplicates_after_sort():

modin_result = modin_df.sort_values(["value", "time"]).drop_duplicates(["value"])
pandas_result = pandas_df.sort_values(["value", "time"]).drop_duplicates(["value"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_with_repeated_index_values():
# This tests for issue #4467: https://github.com/modin-project/modin/issues/4467
data = [[0], [1], [0]]
index = [0, 0, 0]
modin_df, pandas_df = create_test_dfs(data, index=index)
eval_general(modin_df, pandas_df, lambda df: df.drop_duplicates())
eval_general(
modin_df,
pandas_df,
lambda df: df.drop_duplicates(),
comparator=sort_if_range_partitioning,
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down