New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Add performant way to read large tfrecord datasets #42277
[Data] Add performant way to read large tfrecord datasets #42277
Conversation
python/ray/data/read_api.py
Outdated
@@ -1564,13 +1567,50 @@ def read_tfrecords( | |||
shuffle: If setting to "files", randomly shuffle input files order before read. | |||
Defaults to not shuffle with ``None``. | |||
file_extensions: A list of file extensions to filter files by. | |||
schema_inference: Toggles the schema inference applied; applicable only if | |||
tf_schema argument is missing. Defaults to True. | |||
force_fast_read: Forces the fast read, failing if the proper dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this option mostly for test purposes, let me know if there's a better way to do this
@@ -338,9 +338,13 @@ def _str2bytes(d): | |||
assert ds_expected.take() == ds_actual.take() | |||
|
|||
|
|||
@pytest.mark.parametrize("with_tf_schema", (True, False)) | |||
@pytest.mark.parametrize( |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
aa023a5
to
1223af6
Compare
self.tf_schema = tf_schema | ||
self._tf_schema = tf_schema | ||
self._fast_read = fast_read | ||
self._batch_size = batch_size or 2048 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we assign the default 2048 to a constant at the top of the file?
python/ray/data/read_api.py
Outdated
@@ -1490,6 +1490,9 @@ def read_tfrecords( | |||
tf_schema: Optional["schema_pb2.Schema"] = None, | |||
shuffle: Union[Literal["files"], None] = None, | |||
file_extensions: Optional[List[str]] = None, | |||
batch_size: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we name this variable something like fast_read_batch_size
to indicate that this value will be only used with the fast path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, we are kind of leaking to the users that there's an underlying fast and slow im,plementation, which might be confusing to the user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. i think since we will mention that this is used for only the fast path in the docstring anyway, i was thinking it would be good to surface that fact explicitly with the parameter name as well.
return dataset.map_batches( | ||
_unwrap_single_value_lists, | ||
fn_kwargs={"col_lengths": list_sizes["max_list_size"]}, | ||
batch_format="pandas", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rewrite _unwrap_single_value_lists
to use the default (numpy) batch format? pandas is more memory intensive, so we would like to avoid specifying pandas batch format wherever possible.
python/ray/data/read_api.py
Outdated
@@ -1490,6 +1490,9 @@ def read_tfrecords( | |||
tf_schema: Optional["schema_pb2.Schema"] = None, | |||
shuffle: Union[Literal["files"], None] = None, | |||
file_extensions: Optional[List[str]] = None, | |||
batch_size: Optional[int] = None, | |||
schema_inference: bool = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly for this, maybe something like fast_read_auto_infer_schema
?
735fb4e
to
8b29ca1
Compare
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
f67ce7e
to
ddfca72
Compare
ci/docker/data.build.Dockerfile
Outdated
# Dependency used for read_tfrecords function. | ||
# Given that we only use the ExamplesToRecordBatchDecoder | ||
# which is purley c++, we can isntall it with --no-dependencies. | ||
pip install tfx-bsl==1.14.0 --no-dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels like a rather weird way to do it.. is this the intended way to use tfx-bsl
? are there other more direct ways to import and use the logic in ExamplesToRecordBatchDecoder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other more direct ways like adding the dependency like any other dependency or are you thinking about something else?
I tried adding it as a direct dependency in the data test dependencies, but it conflicted with other existing dependencies.
We do not really need to bring transitive dependencies since the only thing we need to use is the ExamplesToRecordBatchDecoder
which is self contained and doesn't need any extra dependency to work. Another approach we could take is to add this class to ray repo itself, I am not really familiar with that part of ray codebase but we would have to add the c class and add a python binding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conflicted with other existing dependencies.
could you elaborate on the specifics?
cc @can-anyscale // example that we might need multiple constraint files.
I am worrying that this is a hack, and might be not sustainable. Like is there any guarantee that when tfx-bsl
has a new version in the future, this will still work? Do we expect users to all install tfx-bsl
with no --no-dependencies
This is not a test-only dependency, but a data dependency. at the end of the days, for it to be useful, it needs to work with other dependencies in key workflows / workloads. Tests and CI are proxies to "ray works for users".
It seems that this is only used as part of the internal implementation, not part of ray data interface? if that is the case, can we fork https://github.com/tensorflow/tfx-bsl , give it another package name, remove the other parts that we do not need, and build it from source, and publish a tfs-bsl-ray
package just for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe we should try to resolve the dependency conflicts..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect users to all install tfx-bsl with no --no-dependencies
yes, we do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect users to all install tfx-bsl with no --no-dependencies
yes, we do
:) this is more of a question of ray code owners. I am asking @scottjlee to help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can always make the assumption (or force users to) install with the --no-dependencies
flag, since they might be using the library's other features. But that would mean the dependency resolution would be put on the user, which we should try to avoid.
Signed-off-by: Scott Lee <sjl@anyscale.com>
ci/docker/data.build.Dockerfile
Outdated
# Dependency used for read_tfrecords function. | ||
# Given that we only use the ExamplesToRecordBatchDecoder | ||
# which is purley c++, we can isntall it with --no-dependencies. | ||
pip install tfx-bsl==1.14.0 --no-dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can always make the assumption (or force users to) install with the --no-dependencies
flag, since they might be using the library's other features. But that would mean the dependency resolution would be put on the user, which we should try to avoid.
Signed-off-by: Scott Lee <sjl@anyscale.com>
11b3bf2
to
bca5bff
Compare
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @martinbomio for the contibution!
Looks solid overall. Having some comments. Thanks.
@@ -23,13 +33,23 @@ def __init__( | |||
self, | |||
paths: Union[str, List[str]], | |||
tf_schema: Optional["schema_pb2.Schema"] = None, | |||
fast_read: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the name fast_read
to read_with_tfx
?
The fast_read
looks a bit vague to me.
fast_read: bool = False, | ||
batch_size: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also add the comment for these two parameters in docstring.
else: | ||
yield from self._slow_read_stream(f, path) | ||
|
||
def _slow_read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_slow_read_stream
-> _default_read_stream
) | ||
|
||
def _fast_read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_fast_read_stream
-> tfx_read_stream
python/ray/data/read_api.py
Outdated
@@ -1583,6 +1583,8 @@ def read_tfrecords( | |||
file_extensions: Optional[List[str]] = None, | |||
concurrency: Optional[int] = None, | |||
override_num_blocks: Optional[int] = None, | |||
fast_read_batch_size: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fast_read_batch_size
-> tfx_read_batch_size
python/ray/data/read_api.py
Outdated
@@ -1583,6 +1583,8 @@ def read_tfrecords( | |||
file_extensions: Optional[List[str]] = None, | |||
concurrency: Optional[int] = None, | |||
override_num_blocks: Optional[int] = None, | |||
fast_read_batch_size: Optional[int] = None, | |||
fast_read_auto_infer_schema: bool = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fast_read_auto_infer_schema
-> tfx_read_auto_infer_schema
python/ray/data/read_api.py
Outdated
Returns: | ||
A :class:`~ray.data.Dataset` that contains the example features. | ||
|
||
Raises: | ||
ValueError: If a file contains a message that isn't a ``tf.train.Example``. | ||
""" | ||
import platform | ||
|
||
fast_read = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fast_read
-> read_with_tfx
python/ray/data/read_api.py
Outdated
if platform.processor() == "arm": | ||
logger.warning( | ||
"The fast strategy of this function depends on tfx-bsl, which is " | ||
"currently not supported on devices with Apple silicon " | ||
"(e.g. M1) and requires an environment with x86 CPU architecture." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check if platform.processor() == "arm"
before import ExamplesToRecordBatchDecoder
? We don't need to print a warning here if user runs on arm.
python/ray/data/read_api.py
Outdated
logger.warning( | ||
"To use TFRecordDatasource with large datasets, please install" | ||
" tfx-bsl package with pip install tfx_bsl --no-dependencies`." | ||
) | ||
logger.info( | ||
"Falling back to slower strategy for reading tf.records. This " | ||
"reading strategy should be avoided when reading large datasets." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's print out one warning message instead:
Please install tfx-bsl package with `pip install tfx_bsl --no-dependencies`. This can help speed up the reading of large TFRecord files.
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Thank you @martinbomio. Can you address the CI test failure? |
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Returns: | ||
A :class:`~ray.data.Dataset` that contains the example features. | ||
|
||
Raises: | ||
ValueError: If a file contains a message that isn't a ``tf.train.Example``. | ||
""" | ||
import platform | ||
|
||
tfx_read = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @martinbomio, final requesst - can we add a config in DataContext
- https://github.com/ray-project/ray/blob/master/python/ray/data/context.py . So user can disable the tfx read code path, in case they run into any bugs? By default, the config can set to true.
DataContext.get_current().enable_tfrecord_tfx_read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python/ray/data/read_api.py
Outdated
|
||
if platform.processor() != "arm": | ||
try: | ||
from tfx_bsl.cc.tfx_bsl_extension.coders import ( # noqa: F401 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, will just import tfx_bsl
work?
|
||
|
||
def _infer_schema_and_transform(dataset: "Dataset"): | ||
list_sizes = dataset.aggregate(_MaxListSize(dataset.schema().names)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment on why this function is needed?
it seems that we'll read the datasource twice, one for aggregate
, one for map_batches
. will that not be less efficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raulchen it will indeed mean an extra pass on the data. The reason it is needed is because tfx-bsl ExampleDecoder returns always list of lists when no schema is provided, and what this function is doing is infering the schema for those fields that are single value fields.
Performance wise, some of our benchmarks on this implementation (we have had it for a while running internally), gives us more than 15X improvements compared to the current implementation. Some of our datasets take ~30m to load with the ray native implementation compared to less than 2m with this tfx-bsl implementation. Let me know if you need more benchmark numbers, happy to provide more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the explanation. it's okay to proceed without more benchmarks for now.
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
Discussed offline w/ @martinbomio. For 2.10 release, let's introduce 3 configs in DataContext, and disable the tfx read by default (to be safe), and we can enable the tfx read in future release.
|
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
actually instead of adding flags in Also, since we are adding 3 parameters, it'd be cleaner to combine them in a single one e.g. |
Signed-off-by: Martin Bomio <martinbomio@spotify.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thanks @martinbomio!
logger = DatasetLogger(__name__) | ||
|
||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lint: add API annotation:
@PublicAPI(stability="alpha")
# # if tfx read is enaled, we just return the dataset because, by default | ||
# # tfx_read will be used in unit tests given that tfx-bsl dependency is | ||
# # installed | ||
# if tfx_read: | ||
# return tf_ds | ||
|
||
# read_op = tf_ds._plan._logical_plan.dag | ||
# datasource_override = read_op._datasource | ||
# datasource_override._tfx_read = tfx_read | ||
# parallelism = read_opts.pop("parallelism", -1) | ||
# ds = ray.data.read_datasource(datasource_override, parallelism=parallelism) | ||
|
||
# return ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's just delete this.
Some CI test failures to fix:
|
@@ -1654,13 +1663,31 @@ def read_tfrecords( | |||
By default, the number of output blocks is dynamically decided based on | |||
input data size and available resources. You shouldn't manually set this | |||
value in most cases. | |||
|
|||
tfx_read_options: Specifies read options when reading TFRecord files with TFX. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tfx_read_options: Specifies read options when reading TFRecord files with TFX. | |
tfx_read_options: [Experimental] Specifies read options when reading TFRecord files with TFX. |
python/ray/data/read_api.py
Outdated
datasets in production use cases. To use this implementation you should | ||
install tfx-bsl with: | ||
1. `pip install tfx_bsl --no-dependencies` | ||
2. DatasetContext.get_current().enable_tfrecords_tfx_read = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this comment?
"Please install tfx-bsl package with" | ||
" `pip install tfx_bsl --no-dependencies`." | ||
" This can help speed up the reading of large TFRecord files." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when user has specified tfx_read_options
but tfx_bsl
isn't installed, it'd be better to just throw an exception.
And I guess it's okay to not check platform here, as tfx_bsl may support arm in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does not support it right now, I find it a lot safer to not assume it will
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when user has specified tfx_read_options but tfx_bsl isn't installed, it'd be better to just throw an exception.
So you are suggestion I just remove the whole block of trying to import and logging a warning which will also remove the fallback?
# parallelism = read_opts.pop("parallelism", -1) | ||
# ds = ray.data.read_datasource(datasource_override, parallelism=parallelism) | ||
|
||
# return ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems not useful any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dockerfile looks fine.
…t#42277) The main motivation for this PR is that ray.data.read_tfrcords yields suboptimal performance when reading large datasets. This PR adds a default "fast" route for reading tf.records that relies on tfx-bsl decoder. This approach also infers the schema when no tf_schema is provided by doing a pass of the data to determine the cardinality of the feature lists. Signed-off-by: Martin Bomio <martinbomio@spotify.com> Signed-off-by: Scott Lee <sjl@anyscale.com> Signed-off-by: Martin <martinbomio@gmail.com> Co-authored-by: Scott Lee <sjl@anyscale.com> Co-authored-by: Cheng Su <scnju13@gmail.com>
Why are these changes needed?
The main motivation for this PR is that ray.data.read_tfrcords yields suboptimal performance when reading large datasets.
This PR adds a default "fast" route for reading tf.records that relies on tfx-bsl decoder. This approach also infers the schema when no tf_schema is provided by doing a pass of the data to determine the cardinality of the feature lists.
Related issue number
N/A
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.