Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Normalize block types before internal multi-block operations #43764

Merged
merged 4 commits into from Mar 8, 2024

Conversation

scottjlee
Copy link
Contributor

@scottjlee scottjlee commented Mar 6, 2024

Why are these changes needed?

Applying grouping operations on Datasets with different underlying Block types can cause exceptions (e.g. various AttributeErrors) due to BlockAccessors assuming that all input block types are of the same type.

We handle this case by normalizing the blocks (either ArrowBlock or PandasBlock) to the first block type before applying the rest of the grouping/aggregation logic.

Related issue number

Closes #31550
Closes #39206
Closes #39155
Closes #39291

Inspired by #39960

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Comment on lines 225 to 227
# If block types are different, but still both of TableBlock type, try
# converting both to default block type before zipping.
self_default, other_default = self.to_default(), acc.to_default()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call normalize_block_types instead to keep code to be consistent?

self_default, other_default = self.to_default(), acc.to_default()
return BlockAccessor.for_block(self_default).zip(other_default)
else:
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In whic case, this ValueError will be triggered?

Copy link
Contributor Author

@scottjlee scottjlee Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for Blocks which do not extend TableBlock class, i think this will be the case. since both ArrowBlock and PandasBlock are TableBlocks themselves, this isn't an issue for these classes, but this would cover any case in which we have other types of Blocks in the future. i can also remove this if we think it's not useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, it's fine to keep it.

seen_types = set()
for block in blocks:
acc = BlockAccessor.for_block(block)
assert isinstance(acc, TableBlockAccessor), type(acc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to throw an actionable error message instead of assert

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 do we still use non-table blocks anywhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still use non-table blocks anywhere?

Actually I don't think we use non-table blocks anywhere.

else:
results = [BlockAccessor.for_block(block).to_default() for block in blocks]

assert all(isinstance(block, type(results[0])) for block in results)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

seen_types = set()
for block in blocks:
acc = BlockAccessor.for_block(block)
assert isinstance(acc, TableBlockAccessor), type(acc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 do we still use non-table blocks anywhere?

Comment on lines 124 to 128
def test_zip_multiple_block_types(ray_start_regular_shared):
df = pd.DataFrame({"spam": [0]})
ds_pd = ray.data.from_pandas(df)
ds2_arrow = ray.data.from_items([{"ham": [0]}])
assert ds_pd.zip(ds2_arrow).take_all() == [{"spam": 0, "ham": [0]}]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this to a different test module? I don't think zip is an all-to-all operation?

@c21 c21 added the release-blocker P0 Issue that blocks the release label Mar 7, 2024
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG

@c21 c21 merged commit 3e4f21f into ray-project:master Mar 8, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-blocker P0 Issue that blocks the release
Projects
None yet
3 participants