Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7090: Add range-partitioning implementation for '.unique()' and '.drop_duplicates()' #7091

Merged
merged 12 commits into from Mar 19, 2024

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Mar 13, 2024

What do these changes do?

This PR merges implementations of .unique()/.drop_duplicates() into a single query compiler method called unique(). Some parts of DataFrame.drop_duplicates() implementation were moved to BaseQueryCompiler.unique().

The performance highly depends on the number of duplicated values in the input data, so it's almost impossible to figure out a solid working heuristic:

perf for '.unique()'

MODIN_CPUS=16
image

MODIN_CPUS=44
image

script to measure
import modin.pandas as pd
import numpy as np
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
import pandas

cfg.CpuCount.put(16)

def get_data(nrows, dtype):
    if dtype == int:
        return np.arange(nrows)
    elif dtype == float:
        return np.arange(nrows).astype(float)
    elif dtype == str:
        return np.array([f"value{i}" for i in range(nrows)])
    else:
        raise NotImplementedError(dtype)

pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy()

nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
dtypes = [int, str]
use_range_part = [True, False]

columns = pandas.MultiIndex.from_product([dtypes, duplicate_rate, use_range_part], names=["dtype", "duplicate rate", "use range-part"])
result = pandas.DataFrame(index=nrows, columns=columns)

i = 0
total_its = len(nrows) * len(duplicate_rate) * len(dtypes) * len(use_range_part)

for dt in dtypes:
    for nrow in nrows:
        data = get_data(nrow, dt)
        np.random.shuffle(data)
        for dpr in duplicate_rate:
            data_c = data.copy()
            dupl_val = data_c[0]

            num_duplicates = int(dpr * nrow)
            dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
            data_c[dupl_indices] = dupl_val

            for impl in use_range_part:
                print(f"{round((i / total_its) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(impl)

                sr = pd.Series(data_c)
                execute(sr)

                t1 = timer()
                # returns a list, so no need for materialization
                sr.unique()
                tm = timer() - t1
                print(nrow, dpr, dt, impl, tm)
                result.loc[nrow, (dt, dpr, impl)] = tm
                result.to_excel("unique.xlsx")
perf for '.drop_duplicates()'

MODIN_CPUS=16
image

MODIN_CPUS=44
image

script to measure
import modin.pandas as pd
import numpy as np
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
import pandas

cfg.CpuCount.put(16)

pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy()

nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
subset = [["col0"], ["col1", "col2", "col3", "col4"], None]
ncols = 15
use_range_part = [True, False]

columns = pandas.MultiIndex.from_product(
    [
        [len(sbs) if sbs is not None else ncols for sbs in subset],
        duplicate_rate,
        use_range_part
    ],
    names=["subset size", "duplicate rate", "use range-part"]
)
result = pandas.DataFrame(index=nrows, columns=columns)

i = 0
total_its = len(nrows) * len(duplicate_rate) * len(subset) * len(use_range_part)

for sbs in subset:
    for nrow in nrows:
        data = {f"col{i}": np.arange(nrow) for i in range(ncols)}
        pandas_df = pandas.DataFrame(data)

        for dpr in duplicate_rate:
            pandas_df_c = pandas_df.copy()
            dupl_val = pandas_df_c.iloc[0]

            num_duplicates = int(dpr * nrow)
            dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
            pandas_df_c.iloc[dupl_indices] = dupl_val

            for impl in use_range_part:
                print(f"{round((i / total_its) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(impl)

                md_df = pd.DataFrame(pandas_df_c)
                execute(md_df)

                t1 = timer()
                res = md_df.drop_duplicates(subset=sbs)
                execute(res)
                tm = timer() - t1

                sbs_s = len(sbs) if sbs is not None else ncols
                print("len()", res.shape, nrow, dpr, sbs_s, impl, tm)
                result.loc[nrow, (sbs_s, dpr, impl)] = tm
                result.to_excel("drop_dupl.xlsx")
  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Add range-partitioning implementation for .unique()/.drop_duplicates() #7090
  • tests added and are passing
  • module layout described at docs/development/architecture.rst is up-to-date

…unique()' and '.drop_duplicates()'

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@dchigarev dchigarev changed the title FEAT-#7090: Add range-partitioning implementation for '.unique()' and… FEAT-#7090: Add range-partitioning implementation for '.unique()' and '.drop_duplicates()' Mar 13, 2024
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Comment on lines -23 to -24
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
shell: bash -l {0}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing of range-partitioning implementations is now performed in a separate action

)
modin_res = modin_series.drop_duplicates(keep=keep, inplace=inplace)
pandas_res = pandas_series.drop_duplicates(keep=keep, inplace=inplace)
if inplace:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise it was comparing Nones in case of inplace=True

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Comment on lines +85 to +86
Range-partitioning implementation of '.unique()'/'.drop_duplicates()' works best when the input data size is big (more than
5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not very descriptive, so as a part of #6987 I'm also planing to include perf measurements that I've made for range-partitioning PRs in the docs

@dchigarev dchigarev marked this pull request as ready for review March 13, 2024 16:00
mask = self
without_duplicates = self.getitem_array(mask.duplicated(keep=keep).invert())
if ignore_index:
without_duplicates.index = pandas.RangeIndex(len(without_duplicates.index))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
without_duplicates.index = pandas.RangeIndex(len(without_duplicates.index))
without_duplicates.index = pandas.RangeIndex(len(without_duplicates))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good note, but here we're operation on a query compiler that doesn't have the .__len__() method, so the only way to get its length is by using its index

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so is there a point in using this line as the index would have to be computed to the set the index? can this be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, we can use .reset_index() here instead that would reset the index without computing the old one

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_unique or drop_duplicates"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we already deprecate MODIN_RANGE_PARTITIONING_GROUPBY?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a non-trivial process, so #7105

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone uses the old config, a deprecation message should be enough to indicate usage of the new config.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to replace the old variable everywhere in our code and make sure that the config values are always synced:

cfg.ExperimentalGroupby == cfg.RangePartitioningGroupby == cfg.RangePartitioning

the last part is a non-trivial one, so I would make the deprecation in a separate PR

@@ -78,3 +78,9 @@ Range-partitioning Merge

It is recommended to use this implementation if the right dataframe in merge is as big as
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.

'.unique()' and '.drop_duplicates()'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we refactor this doc page for 0.29.0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and there is an issue for that #6987

return np.sort(data)


def sort_if_range_partitioning(df1, df2, comparator=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a note somewhere that range partitioning gives rows mismatch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, it should be done in #6987

if ignore_index:
result.index = pandas.RangeIndex(stop=len(result))
if len(diff := pandas.Index(subset).difference(self.columns)) > 0:
raise KeyError(diff)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pandas raise the same error with this message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it raises exactly the same error:

>>> pd_df
   a  b
0  1  2
>>> pd_df.drop_duplicates(subset=["b", "c"])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "python3.9/site-packages/pandas/core/frame.py", line 6805, in drop_duplicates
    result = self[-self.duplicated(subset, keep=keep)]
  File "python3.9/site-packages/pandas/core/frame.py", line 6937, in duplicated
    raise KeyError(Index(diff))
KeyError: Index(['c'], dtype='object')

@doc_utils.add_one_column_warning
@doc_utils.add_refer_to("Series.unique")
def unique(self, **kwargs):
@doc_utils.add_refer_to("Series.drop_duplicates")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@doc_utils.add_refer_to("Series.drop_duplicates")
@doc_utils.add_refer_to("Series.unique")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was done on purpose, the qc.unique() method now uses most of the arguments from Series.drop_duplicates() so it's makes sense to refer to this method instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to have a stable QC API, it seems that having this kind of mismatch is not a good idea. I would also like to have different QCs. One is responsible for d2p, second is as a base class for descendants. BaseQC does both the things now, which is not pretty good I think. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we want to have a stable QC API, it seems that having this kind of mismatch is not a good idea.

So I can rename qc.unique -> qc.drop_duplicates, would that be better?

I would also like to have different QCs. One is responsible for d2p, second is as a base class for descendants.

Looks like unnecessary overcomplication to me. I don't see a problem in making the base class to implement its methods as d2p. I believe all the backends would inherit from d2p qc and ignore the abstract one, what's the point then in having and supporting both of them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can rename qc.unique -> qc.drop_duplicates, would that be better?

It seems we already have drop_duplicates in the base qc. Maybe just put a comment for now on why we refer to drop_duplicates docs?

Looks like unnecessary overcomplication to me. I don't see a problem in making the base class to implement its methods as d2p. I believe all the backends would inherit from d2p qc and ignore the abstract one, what's the point then in having and supporting both of them?

I am talking about d2p and base qc, which is not an abstract but rather common functionality for all storage formats. Would it make sense? It looks like the base qc does double work at the moment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we already have drop_duplicates in the base qc. Maybe just put a comment for now on why we refer to drop_duplicates docs?

No, there's no drop_duplicates() method on the qc level, only .duplicated(). Will put a comment though on why we're referring to drop duplicates

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about d2p and base qc, which is not an abstract but rather common functionality for all storage formats. Would it make sense? It looks like the base qc does double work at the moment.

I don't know, I don't see why that would be a problem and why we should support two QCs for that matter

return self.__constructor__(new_modin_frame)

if not can_use_unique_kernel and not RangePartitioning.get():
return super().unique(keep=keep, ignore_index=ignore_index, subset=subset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this branch for d2p?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this branch is for general unique() implementation that uses QC's public API

@YarShev
Copy link
Collaborator

YarShev commented Mar 19, 2024

Please resolve conflicts.

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@YarShev YarShev merged commit 629bf9d into modin-project:master Mar 19, 2024
37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add range-partitioning implementation for .unique()/.drop_duplicates()
3 participants