Skip to content

[data] Defer hash-shuffle schema broadcast to first row-bearing block#63136

Open
curtis-anyscale wants to merge 1 commit into
ray-project:masterfrom
curtis-anyscale:fix/data-empty-schema-hash-shuffle-broadcast
Open

[data] Defer hash-shuffle schema broadcast to first row-bearing block#63136
curtis-anyscale wants to merge 1 commit into
ray-project:masterfrom
curtis-anyscale:fix/data-empty-schema-hash-shuffle-broadcast

Conversation

@curtis-anyscale
Copy link
Copy Markdown

@curtis-anyscale curtis-anyscale commented May 5, 2026

Why are these changes needed?

The hash shuffle used by Dataset.join() broadcasts the input schema to aggregators. When the upstream step is itself a hash shuffle (e.g. groupby.map_groups) it can emit empty / zero-column blocks. If an empty block is the first to reach the join's hash shuffle, the empty schema is broadcast to the aggregators. Aggregators end up with schema-less blocks, and pa.Table.join fails resolving the join key:

pyarrow.lib.ArrowInvalid: No match or multiple matches for key field reference
FieldRef.Name(<key>) on left side of the join

This is reproducible on the latest stable (Ray 2.55.1) with a from_pandas → groupby.map_groups → join where default_hash_shuffle_parallelism is high enough relative to the distinct key cardinality to leave partitions empty.

The fix defers the broadcast to the first row-bearing block. Zero-row marker blocks fall through with send_empty_blocks=False and are filtered as before; a row-bearing block with a real schema will arrive whenever the dataset is non-empty, and the broadcast then propagates the correct schema to every aggregator.

Minimal repro (fails on master, passes with this PR)

import pandas as pd
import ray

ray.data.DataContext.get_current().default_hash_shuffle_parallelism = 20

def noop(df: pd.DataFrame) -> pd.DataFrame:
    return df

left = (
    ray.data.from_pandas(pd.DataFrame({"key": list("abcde"), "lval": [1, 2, 3, 4, 5]}))
    .groupby("key").map_groups(noop, batch_format="pandas")
)
right = (
    ray.data.from_pandas(pd.DataFrame({"key": list("abcde"), "rval": [10, 20, 30, 40, 50]}))
    .groupby("key").map_groups(noop, batch_format="pandas")
)

left.join(right, on=("key",), join_type="inner", num_partitions=5).show()

Related issue number

N/A.

Checks

  • I've signed off every commit (by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in doc/source/tune/api/ under the corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@curtis-anyscale curtis-anyscale requested a review from a team as a code owner May 5, 2026 18:04
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request modifies the hash shuffle operator to defer schema broadcasting until the first row-bearing block is received, preventing issues where zero-row marker blocks cause downstream join failures. A regression test was added to verify this fix. Feedback suggests refining the broadcast logic to handle cases where a dataset is entirely empty but contains a valid schema by also checking for the presence of columns in the block metadata.

Comment thread python/ray/data/_internal/execution/operators/hash_shuffle.py Outdated
Comment thread python/ray/data/_internal/execution/operators/hash_shuffle.py Outdated
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label May 5, 2026
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 6d27c8f. Configure here.

Comment thread python/ray/data/_internal/execution/operators/hash_shuffle.py Outdated
@curtis-anyscale curtis-anyscale force-pushed the fix/data-empty-schema-hash-shuffle-broadcast branch 4 times, most recently from 7118771 to 242f1ac Compare May 7, 2026 13:18
Dataset.join()'s hash shuffle used the first input block of each side to
broadcast the input schema to every aggregator. When the upstream is
itself a hash shuffle (e.g. groupby.map_groups whose finalize emits empty
marker blocks for empty groups), the first input block can be a zero-row,
zero-column block. Using it for the broadcast caused _create_empty_table
to propagate column-less blocks to all aggregators, after which any
aggregator whose partition was empty on a side ended up with only
schema-less blocks. Downstream join finalize then failed to resolve the
join key, raising e.g. `ArrowInvalid: No match or multiple matches for
key field reference FieldRef.Name(<key>) on left side of the join`.

Defer the broadcast to the first block that actually carries a schema -
either by having rows (in which case its schema is intact) or by being
described by a RefBundle whose schema field is non-empty (e.g. an
empty-but-typed table from `from_arrow`). The streaming executor strips
per-block schema off `BlockMetadata` and hoists it to `RefBundle.schema`
in DataOpTask.on_data_ready, so the bundle is the right place to read
from.

Adds regression tests for the `from_pandas -> groupby.map_groups -> join`
shape (where the upstream emits column-less marker blocks) and for the
`from_arrow(empty_typed) U from_arrow(populated) -> join` shape (where
the first block has zero rows but a real schema).

Signed-off-by: Curtis Howard <curtis.james.howard@gmail.com>
@curtis-anyscale curtis-anyscale force-pushed the fix/data-empty-schema-hash-shuffle-broadcast branch from 242f1ac to 51e0af6 Compare May 8, 2026 12:41
@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions Bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label May 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues stale The issue is stale. It will be closed within 7 days unless there are further conversation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants