Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Normalize upstream blocks for zip/map/reduce/etc. operations using inferred block accessors #39960

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

michaelhly
Copy link
Contributor

@michaelhly michaelhly commented Sep 28, 2023

Signed-off-by: Michael Huang michaelhly@gmail.com

Why are these changes needed?

When reducing mapped outputs (or zipping datasets), BlockAccessors are inferred based on the first block of mapper_outputs. However, mapper_outputs can contain heterogeneous block types inadvertently due to

  • user setting inconsistent batch format in task graph
  • internal fallback conversion when encountering data types not supported by pyarrow
    a.)
    try:
    return ArrowBlockAccessor.numpy_to_block(batch)
    except (pa.ArrowNotImplementedError, pa.ArrowInvalid, pa.ArrowTypeError):
    import pandas as pd
    # TODO(ekl) once we support Python objects within Arrow blocks, we
    # don't need this fallback path.
    return pd.DataFrame(dict(batch))

    b.)
    if self._builder is None:
    try:
    check = ArrowBlockBuilder()
    check.add(item)
    check.build()
    self._builder = ArrowBlockBuilder()
    except (TypeError, pyarrow.lib.ArrowInvalid):
    # Can also handle nested Python objects, which Arrow cannot.
    self._builder = PandasBlockBuilder()

We want to normalize heterogeneous blocks in mapper_output to arrow blocks before using the BlockAccessor for the first block to compute reduced results.

Related issue number

Closes #39155
Closes #39206
Closes #39291
Closes #31550

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
@michaelhly michaelhly changed the title [data] Normalize mapper outputs when reducing blocks [data] Normalize mapper_outputs when reducing blocks Sep 28, 2023
Signed-off-by: Michael Huang <michaelhly@gmail.com>
@michaelhly michaelhly changed the title [data] Normalize mapper_outputs when reducing blocks [data] Normalize mapper_outputs if block reduction fails on AttributeError Sep 29, 2023
Signed-off-by: Michael Huang <michaelhly@gmail.com>
@michaelhly michaelhly changed the title [data] Normalize mapper_outputs if block reduction fails on AttributeError [data] Normalize mapper_outputs before reducing blocks Sep 29, 2023
Signed-off-by: Michael Huang <michaelhly@gmail.com>
@michaelhly michaelhly force-pushed the normalize-mapper-outputs branch 3 times, most recently from 036d720 to 1409bc1 Compare September 29, 2023 03:08
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Copy link
Contributor

@scottjlee scottjlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will make our outputs much more consistent, thanks!

Also left a related comment here, we could even move the implementation from that PR into this one if it's easier (or wait for this one to merge, then update the other PR).

python/ray/data/_internal/util.py Show resolved Hide resolved
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
@michaelhly michaelhly changed the title [data] Normalize mapper_outputs before reducing blocks [data] Normalize blocks before zipping/reducing outputs Sep 30, 2023
@michaelhly michaelhly changed the title [data] Normalize blocks before zipping/reducing outputs [data] Normalize mismatched blocks before zipping/reducing outputs Sep 30, 2023
@michaelhly michaelhly changed the title [data] Normalize mismatched blocks before zipping/reducing outputs [data] Normalize mismatched blocks before zip/reduce operations Sep 30, 2023
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
Signed-off-by: Michael Huang <michaelhly@gmail.com>
@michaelhly michaelhly changed the title [data] Normalize upstream blocks before zip/reduce/map operations [data] Normalize upstream blocks before zip/reduce/map operations Sep 30, 2023
@michaelhly michaelhly changed the title [data] Normalize upstream blocks before zip/reduce/map operations [data] Normalize upstream blocks for zip/reduce/map operations with inferred block accessor or delayed block building Sep 30, 2023
@michaelhly michaelhly changed the title [data] Normalize upstream blocks for zip/reduce/map operations with inferred block accessor or delayed block building [data] Normalize upstream blocks for zip/reduce/map operations with inferred block accessor or delegated block building Sep 30, 2023
@michaelhly michaelhly changed the title [data] Normalize upstream blocks for zip/reduce/map operations with inferred block accessor or delegated block building [data] Normalize upstream blocks for zip/reduce/map operations using inferred block accessors Sep 30, 2023
@@ -95,7 +97,7 @@ def sample_boundaries(
samples = sample_bar.fetch_until_complete(sample_results)
sample_bar.close()
del sample_results
samples = [s for s in samples if len(s) > 0]
samples = normalize_blocks([s for s in samples if len(s) > 0])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalize here b/c later to_numpy call can be ambiguous

@@ -82,7 +83,7 @@ def reduce(
# TODO: Support fusion with other downstream operators.
stats = BlockExecStats.builder()
builder = DelegatingBlockBuilder()
for block in mapper_outputs:
for block in normalize_blocks(mapper_outputs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalize here b/c later random_shuffle call can be ambiguous

@@ -80,7 +81,7 @@ def reduce(
) -> (Block, BlockMetadata):
stats = BlockExecStats.builder()
builder = DelegatingBlockBuilder()
for block in mapper_outputs:
for block in normalize_blocks(mapper_outputs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalize here b/c later random_shuffle call can be ambiguous

@@ -165,7 +167,7 @@ def sample_boundaries(
if should_close_bar:
sample_bar.close()
del sample_results
samples = [s for s in samples if len(s) > 0]
samples = normalize_blocks([s for s in samples if len(s) > 0])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalize here b/c later to_numpy call can be ambiguous

@michaelhly michaelhly changed the title [data] Normalize upstream blocks for zip/reduce/map operations using inferred block accessors [data] Normalize upstream blocks for zip/map/reduce operations using inferred block accessors Oct 1, 2023
@michaelhly michaelhly changed the title [data] Normalize upstream blocks for zip/map/reduce operations using inferred block accessors [data] Normalize upstream blocks for zip/map/reduce/etc. operations using inferred block accessors Oct 1, 2023
@michaelhly michaelhly marked this pull request as ready for review October 2, 2023 15:28
@scottjlee scottjlee assigned c21 and unassigned amogkam Oct 3, 2023
@michaelhly
Copy link
Contributor Author

Hi @c21. Would you mind taking a look at this?

Copy link

stale bot commented Dec 15, 2023

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.

  • If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Dec 15, 2023
@stale stale bot removed the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Dec 25, 2023
@anyscalesam anyscalesam added triage Needs triage (eg: priority, bug/not-bug, and owning component) data Ray Data-related issues labels May 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
6 participants