Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6965: Implement '.merge()' using range-partitioning implementation #6966

Merged
merged 8 commits into from Mar 1, 2024

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Feb 26, 2024

What do these changes do?

This PR adds a new implementation for merge() using range-partitioning mechanism. The new implementation does the following:

  1. Computes range-partition bins for left dataframe
  2. Builds range-partitioning for left dataframe using bins computed at the first step
  3. Builds range-partitioning for right dataframe using bins computed at the first step
  4. Applies the merge kernel row-wise to the left dataframe and broadcasts row partitions of the right dataframe there

The benefit of this implementation is that we never gather the whole right dataframe in one partition (as we do it now) but rather repartition it in a way so it would be correct to broadcast only row partitions of the right df. This implementation benefits when the right dataframe is relatively big.

One of the downsides of this implementation is that repartitioning changes the order of rows, meaning that the result is always sorted by keys at the end.

At the moment, one can only use range-partitioning impl for merge by specifying a special config variable (cfg.RangePartitioningMerge).

Perf measurements for h2o
Important note: original h2o dataset has several categorical columns with high cardinality (high amount of unique values), this is a problematic case for modin (see 2nd point) and it so it works terribly slow with such dtypes. In the measurements below, all categorical columns were casted to strings.

h2o joins, 500mb data

image

h2o joins, 5gb data

image

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Implement merge() using range-partitioning implementation #6965
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
…g implementation

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
-------
PandasQueryCompiler
"""
how = kwargs.get("how", "inner")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function was copied without changes from PandasQueryCompiler.merge

new_dtypes : ModinDtypes or None
Dtypes for the result of merge. ``None`` if not enought metadata to compute.
"""
new_columns = None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic was copied without any changes from PandasQueryCompiler.merge and was placed in a separate method to be reused by range-partitioning impl

@dchigarev dchigarev marked this pull request as ready for review February 27, 2024 11:51
@@ -770,6 +770,19 @@ def _sibling(cls) -> type[EnvWithSibilings]:
)


class RangePartitioningMerge(EnvironmentVariable, type=bool):
Copy link
Collaborator Author

@dchigarev dchigarev Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're planning to implement more methods using range-partitioning and give users a choice to switch between implementations on their own. The approach with config variables doesn't seem to scale good enough, as creating a config variable for each method isn't a good idea IMO.

An alternative could be passing some parameter at the pandas API level specifying which implementation to use:

df1.merge(df2, on="key", impl="range-partitioning")
df.groupby(...).apply(..., impl="range-partitioning")
df.nunique(impl="range-partitioning")

The problems with this approach are:

  1. The user's code loses compatibility with pandas (switching back from modin to pandas would require removing those extra arguments)
  2. The parameter will only make sense for executions based on PandasQueryCompiler (snowflake, hdk executions won't be able to support it)

What do others think in this regard?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do others think in this regard?

Why not just use one config variable (like RangePartitioningImpl) for all the functions that support it?

Copy link
Collaborator Author

@dchigarev dchigarev Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use one config variable (like RangePartitioningImpl) for all the functions that support it?

Yep, it seems like the only option. We could have then expanded this page, giving a short description of every operation that supports range-partitioning with tips on when to use it, and then probably link this page in our optimization notes . I'm personally voting for this option, will try to implement it in this PR. cc @YarShev

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a common variable RangePartitioning, let's handle the transition from RangePartitioningGroupby as well as changes in documentation in a separate PR

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
new_columns=new_columns,
new_dtypes=new_dtypes,
)
).reset_index(drop=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the following in-code comment:

# pandas resets the index of the result unless we were merging on an index level,
# the current implementation only supports merging on column names, so dropping
# the index unconditionally

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@anmyachev anmyachev merged commit a966395 into modin-project:master Mar 1, 2024
37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement merge() using range-partitioning implementation
2 participants