Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7118: Add range-partitioning impl for 'df.resample()' #7140

Merged
merged 3 commits into from
Apr 4, 2024

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Apr 2, 2024

What do these changes do?

Adds range-partitioning impl for df.resample(). The new implementation doesn't always work better, so enabling it only when the flag is specified.

image

script to measure
import pandas
import numpy as np
import modin.pandas as pd
import modin.config as cfg

from timeit import default_timer as timer

from modin.utils import execute

cfg.CpuCount.put(16)

nrows = [1_000_000, 5_000_000, 10_000_000]
ncols = [5, 33]
rules = [
    "500ms", # doubles nrows
    "30s", # decreases nrows in 30 times
    "5min", # decreases nrows in 300
]
use_rparts = [True, False]

cols = pandas.MultiIndex.from_product([rules, ncols, use_rparts], names=["rule", "ncols", "USE RANGE PART"])
rres = pandas.DataFrame(index=nrows, columns=cols)

total_nits = len(nrows) * len(ncols) * len(rules) * len(use_rparts)
i = 0

for nrow in nrows:
    for ncol in ncols:
        index = pandas.date_range("31/12/2000", periods=nrow, freq="s")
        data = {f"col{i}": np.arange(nrow) for i in range(ncol)}
        pd_df = pandas.DataFrame(data, index=index)
        for rule in rules:
            for rparts in use_rparts:
                print(f"{round((i / total_nits) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(rparts)

                df = pd.DataFrame(data, index=index)
                execute(df)

                t1 = timer()
                res = df.resample(rule).sum()
                execute(res)
                ts = timer() - t1
                print(nrow, ncol, rule, rparts, ts)

                rres.loc[nrow, (rule, ncol, rparts)] = ts
                rres.to_excel("resample.xlsx")
  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Add range-partitioning implementation for df.resample() #7118
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

@dchigarev dchigarev added the Blocked ❌ A pull request that is blocked label Apr 2, 2024
@dchigarev dchigarev changed the title FEAT-#7718: Add range-partitioning impl for 'df.resample()' FEAT-#7118: Add range-partitioning impl for 'df.resample()' Apr 2, 2024
@@ -1049,11 +1058,23 @@
PandasQueryCompiler
New QueryCompiler containing the result of resample aggregation.
"""
from modin.core.dataframe.pandas.dataframe.utils import ShuffleResample

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
modin.core.dataframe.pandas.dataframe.utils
begins an import cycle.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this import not at the top of the file?

Copy link
Collaborator Author

@dchigarev dchigarev Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer is right above in the codeQL warning :)

it triggers a circular import otherwise

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
df, columns_info, ascending, **kwargs
)
for i, pivot in enumerate(columns_info[0].pivots):
add_attr(result[i], pivot - pandas.Timedelta(1, unit="ns"))
Copy link
Collaborator Author

@dchigarev dchigarev Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an example of why it's requires

Imagine we have a time series with an Hour resolution:

>>> sh
                       a
2018-01-01 00:00:00  0.0
2018-01-01 01:00:00  1.0
2018-01-01 02:00:00  2.0
2018-01-01 03:00:00  3.0

Resampling this into 30-min intervals gives this:

>>> expected_res = sh.resample("30min").sum()
>>> expected_res
                       a
2018-01-01 00:00:00  0.0
2018-01-01 00:30:00  0.0  <---- interpolated value
2018-01-01 01:00:00  1.0
2018-01-01 01:30:00  0.0 <---- interpolated value
2018-01-01 02:00:00  2.0
2018-01-01 02:30:00  0.0 <---- interpolated value
2018-01-01 03:00:00  3.0

Let's now emulate parallel execution of resample and split sh into two partitions:

>>> pd.concat([sh.iloc[:2].resample("30min").sum(), sh.iloc[2:].resample("30min").sum()])
                       a
2018-01-01 00:00:00  0.0
2018-01-01 00:30:00  0.0  <---- interpolated value
2018-01-01 01:00:00  1.0
*should be an interpolated value here, but it's missing*
2018-01-01 02:00:00  2.0
2018-01-01 02:30:00  0.0 <---- interpolated value
2018-01-01 03:00:00  3.0

The reason for the missing value is that, the first partition only sees an interval from [00:00:00, 01:00:00] and the second sees [02:00:00, 03:00:00], so it's unclear that we should fill the gap between 1h and 2h with 1:30.

One of the solutions is to insert a timestamp dummy value with a slight offset in each partition, so it would know the real bounds of the partition.

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@dchigarev dchigarev removed the Blocked ❌ A pull request that is blocked label Apr 3, 2024
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
"data": {"A": range(12), "B": range(12)},
"index": pandas.date_range("31/12/2000", periods=12, freq="h"),
"data": {
f"col{i}": random_state.randint(RAND_LOW, RAND_HIGH, size=NROWS)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more data to actually use partitioning

@@ -2438,6 +2438,7 @@ def combine_and_apply(
dtypes=new_dtypes,
)

@lazy_metadata_decorator(apply_axis="both")
Copy link
Collaborator Author

@dchigarev dchigarev Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was missing before, however is needed to function properly

resample_kwargs,
"transform",
arg=arg,
allow_range_impl=False,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this approach doesn't work well with transform operations, so all of them are disabled

@dchigarev dchigarev marked this pull request as ready for review April 3, 2024 19:25
@@ -122,6 +124,10 @@ class ShuffleSortFunctions(ShuffleFunctions):
The ideal number of new partitions.
level : list of strings or ints, or None
Index level(s) to use as a key. Can't be specified along with `columns`.
closed_on_right : bool, default: False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
closed_on_right : bool, default: False
close_to_right : bool, default: False

?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I refer to the term "closed interval", closed_on_right means, that we have to include the right bound in it

@@ -1039,6 +1046,8 @@ def _resample_func(
Modin frame. If not specified will be computed automaticly.
df_op : callable(pandas.DataFrame) -> [pandas.DataFrame, pandas.Series], optional
Preprocessor function to apply to the passed frame before resampling.
allow_range_impl : bool, default: True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
allow_range_impl : bool, default: True
range_impl : bool, default: True

or use_range_impl?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow_range_impl=True doesn't necessarily mean, that the range-partition will be used, it also depends on cfg.RangePartitioning value and axis argument. So this parameter indeed only 'allows' for the range-partitioning to be used, not dictates that.

@YarShev YarShev merged commit 8e79122 into modin-project:master Apr 4, 2024
37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add range-partitioning implementation for df.resample()
2 participants