Skip to content

[Data] - write_parquet enable both partition by & min_rows_per_file, max_rows_per_file #53930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

goutamvenkat-anyscale
Copy link
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale commented Jun 18, 2025

Why are these changes needed?

Allow users to pass both partition by & min_rows_per_file, max_rows_per_file into write_parquet.
max_rows_per_file is guaranteed, but not min_rows_per_file (it's deemed as best effort)

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Goutam V <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner June 18, 2025 19:54
Signed-off-by: Goutam V <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale changed the title [Data] - write_parquet enable both partition by & min_rows_per_file [Data] - write_parquet enable both partition by & min_rows_per_file, max_rows_per_file Jun 18, 2025
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale added the go add ONLY when ready to merge, run all tests label Jun 19, 2025
Comment on lines +3415 to +3422
max_rows_per_file: [Experimental] The target maximum number of rows to write
to each file. If ``None``, Ray Data writes a system-chosen number of
rows to each file. If the number of rows per block is smaller than the
specified value, Ray Data writes the number of rows per block to each file.
The specified value is a hint, not a strict limit. Ray Data
might write more or fewer rows to each file. If both ``min_rows_per_file``
and ``max_rows_per_file`` are specified, ``max_rows_per_file`` takes
precedence when they cannot both be satisfied.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we adding this parameter for all the other APIs that support min_rows_per_file, or just for write_parquet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We probably should add this paging mechanism to other APIs. But for now I believe it's just write_parquet.

Maybe a better design for this approach is to leverage tagged unions so that we can more easily swap between options

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would apply to FileDatasinks, because I don't sql or any table-centric sink makes sense

Signed-off-by: Goutam V <goutam@anyscale.com>

effective_min_rows = _validate_rows_per_file_args(
num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file
effective_min_rows, effective_max_rows = _validate_rows_per_file_args(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc should we try to keep it simple and only allow users to specify either min or max? It looks like in the write_parquet anyways max is taking precedence

Signed-off-by: Goutam V <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale removed the go add ONLY when ready to merge, run all tests label Jun 19, 2025
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale added the go add ONLY when ready to merge, run all tests label Jun 20, 2025
@goutamvenkat-anyscale goutamvenkat-anyscale added go add ONLY when ready to merge, run all tests and removed go add ONLY when ready to merge, run all tests labels Jun 23, 2025
Comment on lines 50 to +51
self.min_rows_per_file = min_rows_per_file
self.max_rows_per_file = max_rows_per_file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assert min <= max

Comment on lines 135 to 164
# Determine the effective row limit based on priority: max takes precedence
if self.max_rows_per_file is not None:
# Split based on max_rows_per_file
if total_rows <= self.max_rows_per_file:
# Single file is sufficient
self._write_single_file(
path, [table], filename, output_schema, write_kwargs
)
else:
# Need to split into multiple files
self._split_and_write_table(
table,
path,
filename,
output_schema,
write_kwargs,
)
elif self.min_rows_per_file is not None:
# Only min_rows_per_file is set
if total_rows >= self.min_rows_per_file:
# Single file meets minimum requirement
self._write_single_file(
path, [table], filename, output_schema, write_kwargs
)
else:
# This case should be handled at a higher level by combining blocks
# For now, write as single file
self._write_single_file(
path, [table], filename, output_schema, write_kwargs
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not following this logic: there should be no precedence -- we enforce both min and max if these are specified (and right now you're enforcing either one or the other

Comment on lines +166 to +173
def _split_and_write_table(
self,
table: "pyarrow.Table",
path: str,
base_filename: str,
output_schema: "pyarrow.Schema",
write_kwargs: Dict[str, Any],
) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's make this method more explicit relative to all of its deps (ie min/max)

Comment on lines +182 to +184
estimated_max_rows = _get_max_chunk_size(
table, self._data_context.target_max_block_size
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to know the number of rows associated with the block size

Comment on lines +225 to +230
# Generate filename with index suffix
name_parts = base_filename.rsplit(".", 1)
if len(name_parts) == 2:
chunk_filename = f"{name_parts[0]}_{file_idx:06d}.{name_parts[1]}"
else:
chunk_filename = f"{base_filename}_{file_idx:06d}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be deferred to Pyarrow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Goutam V <goutam@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants