Skip to content

Commit

Permalink
FEAT-#7090: Add range-partitioning implementation for '.unique()' and…
Browse files Browse the repository at this point in the history
… '.drop_duplicates()' (#7091)

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Mar 19, 2024
1 parent f98f050 commit 629bf9d
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 64 deletions.
2 changes: 0 additions & 2 deletions .github/actions/run-core-tests/group_2/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ runs:
modin/pandas/test/dataframe/test_pickle.py
echo "::endgroup::"
shell: bash -l {0}
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
shell: bash -l {0}
5 changes: 4 additions & 1 deletion .github/actions/run-core-tests/group_3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ runs:
- run: |
echo "::group::Running range-partitioning tests (group 3)..."
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_nunique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_unique or test_nunique or drop_duplicates"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_general.py -k "test_unique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_map_metadata.py -k "drop_duplicates"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_join_sort.py -k "merge"
echo "::endgroup::"
shell: bash -l {0}
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ jobs:
- run: python -m pytest modin/pandas/test/dataframe/test_binary.py
- run: python -m pytest modin/pandas/test/dataframe/test_reduce.py
- run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
- run: python -m pytest modin/pandas/test/test_general.py
- run: python -m pytest modin/pandas/test/dataframe/test_indexing.py
- run: python -m pytest modin/pandas/test/test_series.py
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/push-to-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ jobs:
python -m pytest modin/pandas/test/dataframe/test_indexing.py
python -m pytest modin/pandas/test/dataframe/test_iter.py
python -m pytest modin/pandas/test/dataframe/test_join_sort.py
MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
python -m pytest modin/pandas/test/dataframe/test_map_metadata.py
python -m pytest modin/pandas/test/dataframe/test_reduce.py
python -m pytest modin/pandas/test/dataframe/test_udf.py
Expand Down
6 changes: 6 additions & 0 deletions docs/flow/modin/experimental/range_partitioning_groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ Range-partitioning Merge
It is recommended to use this implementation if the right dataframe in merge is as big as
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.

'.unique()' and '.drop_duplicates()'
""""""""""""""""""""""""""""""""""""

Range-partitioning implementation of '.unique()'/'.drop_duplicates()' works best when the input data size is big (more than
5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).

'.nunique()'
""""""""""""""""""""""""""""""""""""

Expand Down
26 changes: 18 additions & 8 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1768,24 +1768,34 @@ def to_timedelta(self, unit="ns", errors="raise"): # noqa: PR02
self, unit=unit, errors=errors
)

# FIXME: get rid of `**kwargs` parameter (Modin issue #3108).
@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.unique")
def unique(self, **kwargs):
# 'qc.unique()' uses most of the arguments from 'df.drop_duplicates()', so refering to this method
@doc_utils.add_refer_to("DataFrame.drop_duplicates")
def unique(self, keep="first", ignore_index=True, subset=None):
"""
Get unique values of `self`.
Get unique rows of `self`.
Parameters
----------
**kwargs : dict
Serves compatibility purpose. Does not affect the result.
keep : {"first", "last", False}, default: "first"
Which duplicates to keep.
ignore_index : bool, default: True
If ``True``, the resulting axis will be labeled ``0, 1, …, n - 1``.
subset : list, optional
Only consider certain columns for identifying duplicates, if `None`, use all of the columns.
Returns
-------
BaseQueryCompiler
New QueryCompiler with unique values.
"""
return SeriesDefault.register(pandas.Series.unique)(self, **kwargs)
if subset is not None:
mask = self.getitem_column_array(subset, ignore_order=True)
else:
mask = self
without_duplicates = self.getitem_array(mask.duplicated(keep=keep).invert())
if ignore_index:
without_duplicates = without_duplicates.reset_index(drop=True)
return without_duplicates

@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.searchsorted")
Expand Down
35 changes: 29 additions & 6 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1933,13 +1933,36 @@ def str_split(self, pat=None, n=-1, expand=False, regex=None):

# END String map partitions operations

def unique(self):
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
def unique(self, keep="first", ignore_index=True, subset=None):
# kernels with 'pandas.Series.unique()' work faster
can_use_unique_kernel = (
subset is None and ignore_index and len(self.columns) == 1 and keep
)
return self.__constructor__(new_modin_frame)

if not can_use_unique_kernel and not RangePartitioning.get():
return super().unique(keep=keep, ignore_index=ignore_index, subset=subset)

if RangePartitioning.get():
new_modin_frame = self._modin_frame._apply_func_to_range_partitioning(
key_columns=self.columns.tolist() if subset is None else subset,
func=(
(lambda df: pandas.DataFrame(df.squeeze(axis=1).unique()))
if can_use_unique_kernel
else (
lambda df: df.drop_duplicates(
keep=keep, ignore_index=ignore_index, subset=subset
)
)
),
preserve_columns=True,
)
else:
new_modin_frame = self._modin_frame.apply_full_axis(
0,
lambda x: x.squeeze(axis=1).unique(),
new_columns=self.columns,
)
return self.__constructor__(new_modin_frame, shape_hint=self._shape_hint)

def searchsorted(self, **kwargs):
def searchsorted(df):
Expand Down
13 changes: 6 additions & 7 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,13 +1511,12 @@ def drop_duplicates(
subset = list(subset)
else:
subset = [subset]
df = self[subset]
else:
df = self
duplicated = df.duplicated(keep=keep)
result = self[~duplicated]
if ignore_index:
result.index = pandas.RangeIndex(stop=len(result))
if len(diff := pandas.Index(subset).difference(self.columns)) > 0:
raise KeyError(diff)
result_qc = self._query_compiler.unique(
keep=keep, ignore_index=ignore_index, subset=subset
)
result = self.__constructor__(query_compiler=result_qc)
if inplace:
self._update_inplace(result._query_compiler)
else:
Expand Down
27 changes: 12 additions & 15 deletions modin/pandas/test/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
generate_multiindex,
random_state,
rotate_decimal_digits_or_symbols,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -247,10 +248,6 @@ def test_join_6602():
],
)
def test_merge(test_data, test_data2):
# RangePartitioning merge always produces sorted result, so we have to sort
# pandas' result as well in order them to match
comparator = df_equals_and_sort if RangePartitioning.get() else df_equals

modin_df = pd.DataFrame(
test_data,
columns=["col{}".format(i) for i in range(test_data.shape[1])],
Expand Down Expand Up @@ -283,7 +280,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

modin_result = modin_df.merge(
modin_df2,
Expand All @@ -299,7 +296,7 @@ def test_merge(test_data, test_data2):
right_on="key",
sort=sorts[j],
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Test for issue #1771
modin_df = pd.DataFrame({"name": np.arange(40)})
Expand All @@ -308,7 +305,7 @@ def test_merge(test_data, test_data2):
pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]})
modin_result = modin_df.merge(modin_df2, on="name", how="inner")
pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner")
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

frame_data = {
"col1": [0, 1, 2, 3],
Expand All @@ -329,7 +326,7 @@ def test_merge(test_data, test_data2):
# Defaults
modin_result = modin_df.merge(modin_df2, how=how)
pandas_result = pandas_df.merge(pandas_df2, how=how)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_index
modin_result = modin_df.merge(
Expand All @@ -338,7 +335,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_on
modin_result = modin_df.merge(
Expand All @@ -347,7 +344,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col1
modin_result = modin_df.merge(
Expand All @@ -356,7 +353,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_on="col1"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_on and right_on col2
modin_result = modin_df.merge(
Expand All @@ -365,7 +362,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col2", right_on="col2"
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# left_index and right_index
modin_result = modin_df.merge(
Expand All @@ -374,7 +371,7 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_index=True
)
comparator(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)

# Cannot merge a Series without a name
ps = pandas.Series(frame_data2.get("col1"))
Expand All @@ -383,7 +380,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
expected_exception=ValueError("Cannot merge a Series without a name"),
)

Expand All @@ -394,7 +391,7 @@ def test_merge(test_data, test_data2):
modin_df,
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=comparator,
comparator=sort_if_range_partitioning,
)

with pytest.raises(TypeError):
Expand Down
20 changes: 13 additions & 7 deletions modin/pandas/test/dataframe/test_map_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
name_contains,
numeric_dfs,
random_state,
sort_if_range_partitioning,
test_data,
test_data_keys,
test_data_values,
Expand Down Expand Up @@ -1068,7 +1069,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
)
else:
df_equals(
sort_if_range_partitioning(
pandas_df.drop_duplicates(
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
),
Expand All @@ -1078,7 +1079,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
)

try:
pandas_results = pandas_df.drop_duplicates(
pandas_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
except Exception as err:
Expand All @@ -1087,10 +1088,10 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
else:
modin_results = modin_df.drop_duplicates(
modin_df.drop_duplicates(
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
)
df_equals(modin_results, pandas_results)
sort_if_range_partitioning(modin_df, pandas_df)


def test_drop_duplicates_with_missing_index_values():
Expand Down Expand Up @@ -1168,7 +1169,7 @@ def test_drop_duplicates_with_missing_index_values():
modin_df = pd.DataFrame(data["data"], index=data["index"], columns=data["columns"])
modin_result = modin_df.sort_values(["id", "time"]).drop_duplicates(["id"])
pandas_result = pandas_df.sort_values(["id", "time"]).drop_duplicates(["id"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_after_sort():
Expand All @@ -1183,15 +1184,20 @@ def test_drop_duplicates_after_sort():

modin_result = modin_df.sort_values(["value", "time"]).drop_duplicates(["value"])
pandas_result = pandas_df.sort_values(["value", "time"]).drop_duplicates(["value"])
df_equals(modin_result, pandas_result)
sort_if_range_partitioning(modin_result, pandas_result)


def test_drop_duplicates_with_repeated_index_values():
# This tests for issue #4467: https://github.com/modin-project/modin/issues/4467
data = [[0], [1], [0]]
index = [0, 0, 0]
modin_df, pandas_df = create_test_dfs(data, index=index)
eval_general(modin_df, pandas_df, lambda df: df.drop_duplicates())
eval_general(
modin_df,
pandas_df,
lambda df: df.drop_duplicates(),
comparator=sort_if_range_partitioning,
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down

0 comments on commit 629bf9d

Please sign in to comment.