-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[Data] - write_parquet enable both partition by & min_rows_per_file, max_rows_per_file #53930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
max_rows_per_file: [Experimental] The target maximum number of rows to write | ||
to each file. If ``None``, Ray Data writes a system-chosen number of | ||
rows to each file. If the number of rows per block is smaller than the | ||
specified value, Ray Data writes the number of rows per block to each file. | ||
The specified value is a hint, not a strict limit. Ray Data | ||
might write more or fewer rows to each file. If both ``min_rows_per_file`` | ||
and ``max_rows_per_file`` are specified, ``max_rows_per_file`` takes | ||
precedence when they cannot both be satisfied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we adding this parameter for all the other APIs that support min_rows_per_file
, or just for write_parquet
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. We probably should add this paging mechanism to other APIs. But for now I believe it's just write_parquet
.
Maybe a better design for this approach is to leverage tagged unions so that we can more easily swap between options
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it would apply to FileDatasinks, because I don't sql or any table-centric sink makes sense
Signed-off-by: Goutam V <goutam@anyscale.com>
|
||
effective_min_rows = _validate_rows_per_file_args( | ||
num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file | ||
effective_min_rows, effective_max_rows = _validate_rows_per_file_args( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc should we try to keep it simple and only allow users to specify either min or max? It looks like in the write_parquet anyways max is taking precedence
Signed-off-by: Goutam V <goutam@anyscale.com>
Signed-off-by: Goutam V <goutam@anyscale.com>
self.min_rows_per_file = min_rows_per_file | ||
self.max_rows_per_file = max_rows_per_file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's assert min <= max
# Determine the effective row limit based on priority: max takes precedence | ||
if self.max_rows_per_file is not None: | ||
# Split based on max_rows_per_file | ||
if total_rows <= self.max_rows_per_file: | ||
# Single file is sufficient | ||
self._write_single_file( | ||
path, [table], filename, output_schema, write_kwargs | ||
) | ||
else: | ||
# Need to split into multiple files | ||
self._split_and_write_table( | ||
table, | ||
path, | ||
filename, | ||
output_schema, | ||
write_kwargs, | ||
) | ||
elif self.min_rows_per_file is not None: | ||
# Only min_rows_per_file is set | ||
if total_rows >= self.min_rows_per_file: | ||
# Single file meets minimum requirement | ||
self._write_single_file( | ||
path, [table], filename, output_schema, write_kwargs | ||
) | ||
else: | ||
# This case should be handled at a higher level by combining blocks | ||
# For now, write as single file | ||
self._write_single_file( | ||
path, [table], filename, output_schema, write_kwargs | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not following this logic: there should be no precedence -- we enforce both min and max if these are specified (and right now you're enforcing either one or the other
def _split_and_write_table( | ||
self, | ||
table: "pyarrow.Table", | ||
path: str, | ||
base_filename: str, | ||
output_schema: "pyarrow.Schema", | ||
write_kwargs: Dict[str, Any], | ||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's make this method more explicit relative to all of its deps (ie min/max)
estimated_max_rows = _get_max_chunk_size( | ||
table, self._data_context.target_max_block_size | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to know the number of rows associated with the block size
# Generate filename with index suffix | ||
name_parts = base_filename.rsplit(".", 1) | ||
if len(name_parts) == 2: | ||
chunk_filename = f"{name_parts[0]}_{file_idx:06d}.{name_parts[1]}" | ||
else: | ||
chunk_filename = f"{base_filename}_{file_idx:06d}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be deferred to Pyarrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[I'm assuming you're referring to this API] (https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html)
Signed-off-by: Goutam V <goutam@anyscale.com>
Why are these changes needed?
Allow users to pass both partition by & min_rows_per_file, max_rows_per_file into
write_parquet
.max_rows_per_file
is guaranteed, but notmin_rows_per_file
(it's deemed as best effort)Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.