Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7090: Add range-partitioning implementation for '.unique()' and '.drop_duplicates()' #7091

Merged
merged 12 commits into from
Mar 19, 2024
2 changes: 0 additions & 2 deletions .github/actions/run-core-tests/group_2/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ runs:
modin/pandas/test/dataframe/test_pickle.py
echo "::endgroup::"
shell: bash -l {0}
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
shell: bash -l {0}
Comment on lines -23 to -24
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing of range-partitioning implementations is now performed in a separate action

5 changes: 4 additions & 1 deletion .github/actions/run-core-tests/group_3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ runs:
- run: |
echo "::group::Running range-partitioning tests (group 3)..."
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_nunique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_unique or test_nunique or drop_duplicates"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_general.py -k "test_unique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_map_metadata.py -k "drop_duplicates"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_join_sort.py -k "merge"
echo "::endgroup::"
shell: bash -l {0}
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ jobs:
- run: python -m pytest modin/pandas/test/dataframe/test_binary.py
- run: python -m pytest modin/pandas/test/dataframe/test_reduce.py
- run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
- run: python -m pytest modin/pandas/test/test_general.py
- run: python -m pytest modin/pandas/test/dataframe/test_indexing.py
- run: python -m pytest modin/pandas/test/test_series.py
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/push-to-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ jobs:
python -m pytest modin/pandas/test/dataframe/test_indexing.py
python -m pytest modin/pandas/test/dataframe/test_iter.py
python -m pytest modin/pandas/test/dataframe/test_join_sort.py
MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
python -m pytest modin/pandas/test/dataframe/test_map_metadata.py
python -m pytest modin/pandas/test/dataframe/test_reduce.py
python -m pytest modin/pandas/test/dataframe/test_udf.py
Expand Down
6 changes: 6 additions & 0 deletions docs/flow/modin/experimental/range_partitioning_groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ Range-partitioning Merge
It is recommended to use this implementation if the right dataframe in merge is as big as
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.

'.unique()' and '.drop_duplicates()'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we refactor this doc page for 0.29.0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and there is an issue for that #6987

""""""""""""""""""""""""""""""""""""

Range-partitioning implementation of '.unique()'/'.drop_duplicates()' works best when the input data size is big (more than
5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).
Comment on lines +85 to +86
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not very descriptive, so as a part of #6987 I'm also planing to include perf measurements that I've made for range-partitioning PRs in the docs


'.nunique()'
""""""""""""""""""""""""""""""""""""

Expand Down
26 changes: 18 additions & 8 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1768,24 +1768,34 @@
self, unit=unit, errors=errors
)

# FIXME: get rid of `**kwargs` parameter (Modin issue #3108).
@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.unique")
def unique(self, **kwargs):
# 'qc.unique()' is uses most of the arguments from 'df.drop_duplicates()', so refering to this method
YarShev marked this conversation as resolved.
Show resolved Hide resolved
@doc_utils.add_refer_to("DataFrame.drop_duplicates")
def unique(self, keep="first", ignore_index=True, subset=None):
"""
Get unique values of `self`.
Get unique rows of `self`.

Parameters
----------
**kwargs : dict
Serves compatibility purpose. Does not affect the result.
keep : {"first", "last", False}, default: "first"
Which duplicates to keep.
ignore_index : bool, default: True
If ``True``, the resulting axis will be labeled ``0, 1, …, n - 1``.
subset : list, optional
Only consider certain columns for identifying duplicates, if `None`, use all of the columns.

Returns
-------
BaseQueryCompiler
New QueryCompiler with unique values.
"""
return SeriesDefault.register(pandas.Series.unique)(self, **kwargs)
if subset is not None:
mask = self.getitem_column_array(subset, ignore_order=True)

Check warning on line 1792 in modin/core/storage_formats/base/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler.py#L1791-L1792

Added lines #L1791 - L1792 were not covered by tests
else:
mask = self
without_duplicates = self.getitem_array(mask.duplicated(keep=keep).invert())
if ignore_index:
without_duplicates = without_duplicates.reset_index(drop=True)
return without_duplicates

Check warning on line 1798 in modin/core/storage_formats/base/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler.py#L1794-L1798

Added lines #L1794 - L1798 were not covered by tests

@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.searchsorted")
Expand Down
35 changes: 29 additions & 6 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1933,13 +1933,36 @@

# END String map partitions operations

def unique(self):
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
def unique(self, keep="first", ignore_index=True, subset=None):
# kernels with 'pandas.Series.unique()' work faster
can_use_unique_kernel = (

Check warning on line 1938 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L1938

Added line #L1938 was not covered by tests
subset is None and ignore_index and len(self.columns) == 1 and keep
)
return self.__constructor__(new_modin_frame)

if not can_use_unique_kernel and not RangePartitioning.get():
return super().unique(keep=keep, ignore_index=ignore_index, subset=subset)

Check warning on line 1943 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L1942-L1943

Added lines #L1942 - L1943 were not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this branch for d2p?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this branch is for general unique() implementation that uses QC's public API


if RangePartitioning.get():
new_modin_frame = self._modin_frame._apply_func_to_range_partitioning(

Check warning on line 1946 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L1945-L1946

Added lines #L1945 - L1946 were not covered by tests
key_columns=self.columns.tolist() if subset is None else subset,
func=(
(lambda df: pandas.DataFrame(df.squeeze(axis=1).unique()))
if can_use_unique_kernel
else (
lambda df: df.drop_duplicates(
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
keep=keep, ignore_index=ignore_index, subset=subset
)
)
),
preserve_columns=True,
)
else:
new_modin_frame = self._modin_frame.apply_full_axis(

Check warning on line 1960 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L1960

Added line #L1960 was not covered by tests
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
)
return self.__constructor__(new_modin_frame, shape_hint=self._shape_hint)

Check warning on line 1965 in modin/core/storage_formats/pandas/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler.py#L1965

Added line #L1965 was not covered by tests

def searchsorted(self, **kwargs):
def searchsorted(df):
Expand Down
13 changes: 6 additions & 7 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,13 +1511,12 @@
subset = list(subset)
else:
subset = [subset]
df = self[subset]
else:
df = self
duplicated = df.duplicated(keep=keep)
result = self[~duplicated]
if ignore_index:
result.index = pandas.RangeIndex(stop=len(result))
if len(diff := pandas.Index(subset).difference(self.columns)) > 0:
raise KeyError(diff)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pandas raise the same error with this message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it raises exactly the same error:

>>> pd_df
   a  b
0  1  2
>>> pd_df.drop_duplicates(subset=["b", "c"])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "python3.9/site-packages/pandas/core/frame.py", line 6805, in drop_duplicates
    result = self[-self.duplicated(subset, keep=keep)]
  File "python3.9/site-packages/pandas/core/frame.py", line 6937, in duplicated
    raise KeyError(Index(diff))
KeyError: Index(['c'], dtype='object')

result_qc = self._query_compiler.unique(

Check warning on line 1516 in modin/pandas/base.py

View check run for this annotation

Codecov / codecov/patch

modin/pandas/base.py#L1514-L1516

Added lines #L1514 - L1516 were not covered by tests
keep=keep, ignore_index=ignore_index, subset=subset
)
result = self.__constructor__(query_compiler=result_qc)

Check warning on line 1519 in modin/pandas/base.py

View check run for this annotation

Codecov / codecov/patch

modin/pandas/base.py#L1519

Added line #L1519 was not covered by tests
if inplace:
self._update_inplace(result._query_compiler)
else:
Expand Down
27 changes: 12 additions & 15 deletions modin/pandas/test/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
generate_multiindex,
random_state,
rotate_decimal_digits_or_symbols,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -247,10 +248,6 @@ def test_join_6602():
],
)
def test_merge(test_data, test_data2):
# RangePartitioning merge always produces sorted result, so we have to sort
# pandas' result as well in order them to match
comparator = df_equals_and_sort if RangePartitioning.get() else df_equals

modin_df = pd.DataFrame(
test_data,
columns=["col{}".format(i) for i in range(test_data.shape[1])],
Expand Down Expand Up @@ -283,7 +280,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

modin_result = modin_df.merge(
modin_df2,
Expand All @@ -299,7 +296,7 @@ def test_merge(test_data, test_data2):
right_on="key",
sort=sorts[j],
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Test for issue #1771
modin_df = pd.DataFrame({"name": np.arange(40)})
Expand All @@ -308,7 +305,7 @@ def test_merge(test_data, test_data2):
pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]})
modin_result = modin_df.merge(modin_df2, on="name", how="inner")
pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner")
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

frame_data = {
"col1": [0, 1, 2, 3],
Expand All @@ -329,7 +326,7 @@ def test_merge(test_data, test_data2):
# Defaults
modin_result = modin_df.merge(modin_df2, how=how)
pandas_result = pandas_df.merge(pandas_df2, how=how)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_index
modin_result = modin_df.merge(
Expand All @@ -338,7 +335,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_on
modin_result = modin_df.merge(
Expand All @@ -347,7 +344,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col1
modin_result = modin_df.merge(
Expand All @@ -356,7 +353,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col2
modin_result = modin_df.merge(
Expand All @@ -365,7 +362,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col2", right_on="col2"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_index
modin_result = modin_df.merge(
Expand All @@ -374,7 +371,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Cannot merge a Series without a name
ps = pandas.Series(frame_data2.get("col1"))
Expand All @@ -383,7 +380,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
expected_exception=ValueError("Cannot merge a Series without a name"),
)

Expand All @@ -394,7 +391,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
)

with pytest.raises(TypeError):
Expand Down
20 changes: 13 additions & 7 deletions modin/pandas/test/dataframe/test_map_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
name_contains,
numeric_dfs,
random_state,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -1068,7 +1069,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
)
else:
df_equals(
sort_if_range_partitioning(
pandas_df.drop_duplicates(
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
),
Expand All @@ -1078,7 +1079,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
)

try:
pandas_results = pandas_df.drop_duplicates(
pandas_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
except Exception as err:
Expand All @@ -1087,10 +1088,10 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
else:
modin_results = modin_df.drop_duplicates(
modin_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
df_equals(modin_results, pandas_results)
sort_if_range_partitioning(modin_df, pandas_df)


def test_drop_duplicates_with_missing_index_values():
Expand Down Expand Up @@ -1168,7 +1169,7 @@ def test_drop_duplicates_with_missing_index_values():
modin_df = pd.DataFrame(data["data"], index=data["index"], columns=data["columns"])
modin_result = modin_df.sort_values(["id", "time"]).drop_duplicates(["id"])
pandas_result = pandas_df.sort_values(["id", "time"]).drop_duplicates(["id"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_after_sort():
Expand All @@ -1183,15 +1184,20 @@ def test_drop_duplicates_after_sort():

modin_result = modin_df.sort_values(["value", "time"]).drop_duplicates(["value"])
pandas_result = pandas_df.sort_values(["value", "time"]).drop_duplicates(["value"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_with_repeated_index_values():
# This tests for issue #4467: https://github.com/modin-project/modin/issues/4467
data = [[0], [1], [0]]
index = [0, 0, 0]
modin_df, pandas_df = create_test_dfs(data, index=index)
eval_general(modin_df, pandas_df, lambda df: df.drop_duplicates())
eval_general(
modin_df,
pandas_df,
lambda df: df.drop_duplicates(),
comparator=sort_if_range_partitioning,
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down