From 42714cb37f26ed95055c6664f783cb189b82ffdf Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 7 Jan 2020 15:12:48 -0800 Subject: [PATCH 01/11] Add tf.data Snapshot Public RFC --- rfcs/20200107-tf-data-snapshot.md | 368 ++++++++++++++++++++++++++++++ 1 file changed, 368 insertions(+) create mode 100644 rfcs/20200107-tf-data-snapshot.md diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md new file mode 100644 index 000000000..cb0105844 --- /dev/null +++ b/rfcs/20200107-tf-data-snapshot.md @@ -0,0 +1,368 @@ +# tf.data Snapshot + +| Status | Proposed | +| :------------ | :------------------------------------------------------ | +| **RFC #** | [193](https://github.com/tensorflow/community/pull/193) | +| **Author(s)** | Frank Chen (frankchn@google.com), Rohan Jain | +: : (rohanj@google.com) : +| **Sponsor** | Jiri Simsa (jsimsa@google.com) | +| **Updated** | 2020-01-07 | + +## Objective + +With ever faster accelerators available in Cloud and hyperparameter tuning +consuming larger chunks of accelerator time, TensorFlow users are increasingly +finding that they don’t have enough CPU resources to keep up with these +accelerators, leaving valuable accelerator resources idle. + +To alleviate this problem, we are proposing a `snapshot` API within `tf.data`, +to allow users to transparently persist the output of their preprocessing +pipeline to disk, and materialize the pre-processed data on a different training +run. + +This API enables repeated preprocessing steps to be consolidated, and allowing +re-use of already processed data, trading off disk storage and network bandwidth +for freeing up more valuable CPU resources and accelerator compute time. + +## Motivation + +Large TensorFlow users have indicated that they have complicated input +processing pipelines which saturate their CPUs before saturating their +accelerators (TPUs in particular). Since they often experiment with +hyperparameter tuning or tweaks to existing model without affecting their input +pipeline, they are asking for ways to avoid similar repeated preprocessing of +data by either saving a dataset or caching it to disk. + +## User Benefit + +Users will be able to transparently persist partially or fully processed data +from `tf.data` input pipelines to disk or Cloud storage systems, and materialize +the pre-processed data during subsequent runs from the same pipeline. This will +cut down on the input pipeline processing overheads during second and subsequent +runs. + +## Design Proposal + +We propose that we add a new `snapshot` transformation to tf.data. To illustrate +the usage of the transformation, we can start with some sample code: + +```python +dataset = Dataset.list_files("/raw/data/*").shard(num_workers, i) +dataset = dataset.parallel_interleave(TFRecordDataset) +dataset = dataset.map(my_preprocessing_fn) +dataset = dataset.apply(tf.data.snapshot("/saved/data", options...)) +dataset = dataset.repeat() + +model = ... +model.fit(dataset) +``` + +As we can see, the end user simply has to add this transformation in order to +use this functionality. In essence, the transformation is similar to the +existing `tf.data.Dataset.cache`, with the key difference is being that, unlike +`cache`, `snapshot` is intended to re-used across different executions of the +same input pipelines. + +### Proposed API + +We are proposing the following API for the snapshot transformation. + +```python +def snapshot(path, + compression=None, + reader_path_prefix=None, + writer_path_prefix=None, + shard_size_bytes=None, + pending_snapshot_expiry_seconds=None, + num_reader_threads=None, + reader_buffer_size=None, + num_writer_threads=None, + writer_buffer_size=None, + shuffle_on_read=None, + shuffle_seed=None, + mode=None, + snapshot_name=None): + pass # Implementation goes here. +``` + +1. `path`: Required. A directory where we want to save our snapshots and/or + read from a previously saved snapshot. + +2. `compression`: Optional. The type of compression to apply to the snapshot + written to disk. This will support `GZIP`, `SNAPPY` or None. Defaults to + None. + +3. `reader_path_prefix`: Optional. A prefix to add to the path when reading + from snapshots. This is useful for filesystems where configuration is passed + in through the path. Defaults to None. + +4. `writer_path_prefix`: Optional. A prefix to add to the path when writing to + snapshots. This is useful for filesystems where configuration is passed in + through the path. Defaults to None. + +5. `shard_size_bytes`: Optional. The maximum size of each data file to be + written by the snapshot dataset op. Defaults to 10 GiB. + +6. `pending_snapshot_expiry_seconds`: Optional. How long to wait (in seconds) + before the snapshot op considers a previously unfinished snapshot to be + stale and starts writing a snapshot from scratch again. Defaults to 86400 + seconds (1 day). + +7. `num_reader_threads`: Optional. Number of threads to parallelize reading + from snapshot. Especially useful if compression is turned on since the + decompression operation tends to be intensive. Defaults to 1. If > 1, then + this might introduce non-determinism i.e. the order in which the elements + are read from the snapshot are different from the order they're written. + +8. `reader_buffer_size`: Optional. Maximum number of elements we can prefetch + reading from the snapshot. Defaults to 1. Increasing this might improve + performance but will increase memory consumption. + +9. `num_writer_threads`: Optional. Number of threads to parallelize writing + from snapshot. We'll open up `num_writer_threads` files and write to them in + parallel. Especially useful if compression is turned on since the + compression operation tends to be intensive. Defaults to 1. If > 1, then + this might introduce non-determinism i.e. the order in which the elements + are read from the upstream iterator are different from the order they're + written. + +10. `writer_buffer_size`: Optional. Maximum number of pipeline elements to fill + up the buffer before writing them out using `num_writer_threads`. + +11. `shuffle_on_read`: Optional. If this is True, then the order in which + examples are produced when reading from a snapshot will be random. Defaults + to False. + +12. `shuffle_seed`: Optional. If shuffle_seed is set, the random number + generator used for shuffling (when `shuffle_on_read` is turned on) is seeded + by the given seed. Otherwise, it is seeded by a random seed that differs for + every run. + +13. `mode`: Optional. The mode at which snapshot should operate. Valid options + are `auto`, `read`, `write`, and `passthrough`. The default mode is `auto`, + where the snapshot op will automatically determine what mode to operate in. + + 1. `write` mode forces the snapshot transformation to write a new + materialization to disk, regardless of whether a complete and valid + materialization currently exists. In other words, we enter the **WRITE** + state immediately. + + 2. `read` mode forces the snapshot transformation to read from the latest + version of the materialization on disk, regardless of whether the data + stored on disk is complete and valid. In other words, we enter the + **READ** state immediately. + + 3. `passthrough` mode turns the snapshot transformation into a no-op. In + other words, we enter the **PASSTHROUGH** state immediately. + + 4. `auto` retains the default behavior of snapshot. See the "Standard + Kernel Workflow" section for the default behavior. + +14. `snapshot_name`: Optional. If set, use the supplied string as a named + snapshot name instead of introspecting the data pipeline and automatically + generating a unique identifier for the specific data pipeline. + + 1. Instead of generating a new fingerprint of the input processing graph or + and `run_id` (see the _Detailed Design_ section for details), we will + use the `snapshot_name` to uniquely identify the snapshot. + +### External API Guarantees + +Externally, we guarantee that snapshots written by a particular version of +TensorFlow will be readable by that specific version of TensorFlow. Eventually, +we can also guarantee that snapshots written will be readable by all future +versions of TensorFlow. + +We are not currently handling the case where workers do not go through the +entire training set at least once. + +### Alternatives Considered + +An alternative proposal for an API would be `save()` and `load()`, where the +saving and loading of the input pipeline would be made more explicit, avoiding +some of the logic needed in determining whether to snapshot or read from a +snapshot of a model. + +The downside here would be that the user would have to split the preprocessing +and training into potentially different files, and users would be forced to +select whether to train or preprocess on their own, which is not good. + +### Performance Implications + +* Do you expect any (speed / memory)? How will you confirm? +* There should be microbenchmarks. Are there? +* There should be end-to-end tests and benchmarks. If there are not (since + this is still a design), how will you track that these will be created? + +### Dependencies + +No new dependencies will be introduced as part of this project to TensorFlow. +Dependent projects may be able to use this additional op, but there should be no +significant changes otherwise. + +### Engineering Impact + +Binary sizes increases slightly with the inclusion of this new op, and this code +will be maintained by the `tf.data` team. + +### Platforms and Environments + +This op will work on all TensorFlow-supported platforms. We do not anticipate +this to work on embedded systems as it is not useful in resource-constrained +environments. + +### Best Practices, Tutorials and Examples + +A user guide for snapshot will be published to guide new users in using this +feature. + +### Compatibility + +This introduces a new op, which will impact future backwards compatibility. + +### User Impact + +A new python function and a new op are the only user-facing changes visible. + +## Detailed Design + +### Implementation Assumptions + +The following implementation is based on the following assumptions that define +the MVP this is designed for: + +1. We assume that at least for one pipeline run, you can go through the entire + training dataset and be able to store that data on disk. Otherwise, a + snapshot will never get created. + +2. In case there are multiple workers and the dataset is sharded across + workers, we assume that the number of workers remains the same from one run + to another. If the number changes, we’ll trigger another snapshot. + +3. Any `repeat`s in the dataset should be moved to after the `snapshot` op, to + avoid writing large (or infinite) amounts of data during a snapshot writing + run. + +### New `SnapshotDatasetOp` + +To implement the transformation, we are introducing a new `SnapshotDatasetOp` +dataset kernel that will implement all of the functionality in TensorFlow C++. +Python code is mostly glue code to pass relevant parameters into the op kernel. + +### Internal Directory / File Structure + +Given a user directory path (e.g. `/path/to/snapshot`), the directory will look +like: + +* /path/to/snapshot + * `fingerprint`/ + * snapshot.metadata + * `run-id`/ + * 0000000.snapshot + * 0000001.snapshot + +The `fingerprint` is a hash of the input processing graph. The `run-id` is +unique training run ID generated. + +### Standard Kernel Workflow + +_Note: This is an implementation detail, and may change in the future. This +should not be relied upon except as a reference to the current implementation._ + +By default, the `snapshot` operation will, upon startup, make a determination +using the following algorithm as to whether the operation should be in the +WRITE, PASSTHROUGH, or READ state. + +1. We will compute a graph fingerprint containing all the information from the + Dataset preprocessing graph before the `snapshot` op. We’ll use the + `AsGraphDefInternal` method on DatasetBase for this. + +1. We will attempt to enter the corresponding fingerprint directory. For + instance, if the computed fingerprint is `f-abc123` and the base snapshot + directory is `/saved/data`, then we will attempt to enter + `/saved/data/f-abc123`. + +1. If the snapshot directory is non-existent, empty or it doesn’t contain a + `metadata` file, we will enter the **WRITE** state. + +1. If the snapshot directory contains a `metadata` file, we will read the + metadata file. + + 1. The metadata file contains the following fields: + 1. A training run ID + 1. A boolean indicating if the snapshot is complete + 1. A training run start-time. + +1. If the training run start-time is more than the (configurable) training run + timeout (set with the `pending_snapshot_expiry_seconds` parameter), we will + enter the **WRITE** state. + +1. If the training run start-time is less than the training run timeout, but + the snapshot is not complete, then we will enter the **PASSTHROUGH** state. + +1. If the snapshot is complete, we will enter the **READ** state. + +#### WRITE State + +1. We generate a random training run ID. + +1. We write (possibly overwriting) the `snapshot.metadata` file. + +1. We proceed to create a subdirectory containing the training run ID, and + start writing data asynchronously in chunks. + +1. At the end of the dataset (when `end_of_sequence == true`), we will check + the snapshot.metadata file to determine whether it contains the same + training run ID. + + 1. If it does, we set the complete bit to true to finalize the directory. + 1. If it does not, it means that someone else is concurrently writing the + snapshot and we lost the race to them. We delete all data in the + training run directory. + +For the current implementation, we will store the data in chunked TFRecord +files. Eventually we may move to other more higher performance data stores or +support additional storage systems such as Cloud BigTable. + +#### PASSTHROUGH State + +1. This is a no-op, where we simply pass through the tensors to the downstream + operations. + +#### READ State + +1. We will read from the snapshots contained within the subfolder with the + correct graph fingerprint and specified training run ID. + +1. Optionally, the user may choose to tell us to specify that the snapshots + should be read back in shuffled order. + +### Concurrency: Handling Multiple Input Workers + +If input workers are sharded, then they will generate different graph +fingerprints as their shard indexes will be different. This will result in each +worker writing to a different subdirectory. + +If input workers are not sharded, then this will result in a race and +potentially multiple workers writing data (still with different training run +IDs). Eventually, if each worker finishes, we will be left with one copy of the +data as all the other workers will determine that they have lost the race and +delete their own copy of the snapshot data. + +## Questions and Discussion Topics + +* Should we implement this as three ops (a control opt o determine whether a + snapshot is to be read from/written to) and a write and read op to do the + respective operations? + * Pros include: + * Modularizes the implementation into smaller chunks + * Allows someone else to do the "control" + * Challenges include: + * Where/how the "control" runs? + * How do we construct the dataset graph properly? +* How should autotuning be integrated into the snapshot transformation? +* Are the configuration options well named? Is it possible to consolidate some + of these options? +* What other compression/decompression options would you like to see + supported? +* Any other performance / feature tuning knobs we should make available? From 958ae634eff74de3a71e9991e6fea892c1e3f4af Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 7 Jan 2020 15:15:20 -0800 Subject: [PATCH 02/11] Minor formatting changes --- rfcs/20200107-tf-data-snapshot.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index cb0105844..9350e84d1 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -4,7 +4,7 @@ | :------------ | :------------------------------------------------------ | | **RFC #** | [193](https://github.com/tensorflow/community/pull/193) | | **Author(s)** | Frank Chen (frankchn@google.com), Rohan Jain | -: : (rohanj@google.com) : +| | (rohanj@google.com) | | **Sponsor** | Jiri Simsa (jsimsa@google.com) | | **Updated** | 2020-01-07 | From 2bee40f645d49445efe548ab8c59d58586e10d24 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 9 Jan 2020 16:38:15 -0800 Subject: [PATCH 03/11] Updated design doc after comments from various folks --- rfcs/20200107-tf-data-snapshot.md | 51 +++++++++++++++++++------------ 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index 9350e84d1..f535eeb72 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -90,7 +90,7 @@ def snapshot(path, 2. `compression`: Optional. The type of compression to apply to the snapshot written to disk. This will support `GZIP`, `SNAPPY` or None. Defaults to - None. + AUTO. 3. `reader_path_prefix`: Optional. A prefix to add to the path when reading from snapshots. This is useful for filesystems where configuration is passed @@ -101,7 +101,7 @@ def snapshot(path, through the path. Defaults to None. 5. `shard_size_bytes`: Optional. The maximum size of each data file to be - written by the snapshot dataset op. Defaults to 10 GiB. + written by the snapshot dataset op. Defaults to AUTO. 6. `pending_snapshot_expiry_seconds`: Optional. How long to wait (in seconds) before the snapshot op considers a previously unfinished snapshot to be @@ -110,28 +110,31 @@ def snapshot(path, 7. `num_reader_threads`: Optional. Number of threads to parallelize reading from snapshot. Especially useful if compression is turned on since the - decompression operation tends to be intensive. Defaults to 1. If > 1, then + decompression operation tends to be intensive. If > 1, then this might introduce non-determinism i.e. the order in which the elements are read from the snapshot are different from the order they're written. + Defaults to AUTO. 8. `reader_buffer_size`: Optional. Maximum number of elements we can prefetch - reading from the snapshot. Defaults to 1. Increasing this might improve - performance but will increase memory consumption. + reading from the snapshot. Increasing this might improve + performance but will increase memory consumption. Defaults to AUTO. 9. `num_writer_threads`: Optional. Number of threads to parallelize writing from snapshot. We'll open up `num_writer_threads` files and write to them in parallel. Especially useful if compression is turned on since the - compression operation tends to be intensive. Defaults to 1. If > 1, then + compression operation tends to be intensive. If > 1, then this might introduce non-determinism i.e. the order in which the elements are read from the upstream iterator are different from the order they're - written. + written. Defaults to AUTO. 10. `writer_buffer_size`: Optional. Maximum number of pipeline elements to fill - up the buffer before writing them out using `num_writer_threads`. + up the buffer before writing them out using `num_writer_threads`. Defaults + to AUTO. -11. `shuffle_on_read`: Optional. If this is True, then the order in which - examples are produced when reading from a snapshot will be random. Defaults - to False. +11. `shuffle_on_read`: Optional. If this is True, then snapshot randomizes the + order in which the snapshot files are read back. This emulates shuffling + of the input files during a training run (e.g. when `Dataset.list_files` + is called with `shuffle` turned on). Defaults to False. 12. `shuffle_seed`: Optional. If shuffle_seed is set, the random number generator used for shuffling (when `shuffle_on_read` is turned on) is seeded @@ -166,12 +169,15 @@ def snapshot(path, and `run_id` (see the _Detailed Design_ section for details), we will use the `snapshot_name` to uniquely identify the snapshot. +Note: `AUTO` options above indicates that snapshot will attempt to pick a +reasonable default that is suitable for most use cases. We will eventually add +tf.data autotuning to pick the right parameters for the best performance for +individual workloads. + ### External API Guarantees Externally, we guarantee that snapshots written by a particular version of -TensorFlow will be readable by that specific version of TensorFlow. Eventually, -we can also guarantee that snapshots written will be readable by all future -versions of TensorFlow. +TensorFlow will be readable by that specific version of TensorFlow. We are not currently handling the case where workers do not go through the entire training set at least once. @@ -285,14 +291,17 @@ WRITE, PASSTHROUGH, or READ state. 1. If the snapshot directory is non-existent, empty or it doesn’t contain a `metadata` file, we will enter the **WRITE** state. -1. If the snapshot directory contains a `metadata` file, we will read the - metadata file. +1. If the snapshot directory contains a `metadata.final` file, we will read + the final metadata file and proceed to the **READ** state. - 1. The metadata file contains the following fields: - 1. A training run ID - 1. A boolean indicating if the snapshot is complete + 1. The file contains the following fields: + 1. A training run ID, + 1. A boolean indicating if the snapshot is complete. 1. A training run start-time. +1. If the snapshot directory contains a `metadata` file but not a + `metadata.final` file, we will read the metadata file. + 1. If the training run start-time is more than the (configurable) training run timeout (set with the `pending_snapshot_expiry_seconds` parameter), we will enter the **WRITE** state. @@ -315,7 +324,9 @@ WRITE, PASSTHROUGH, or READ state. the snapshot.metadata file to determine whether it contains the same training run ID. - 1. If it does, we set the complete bit to true to finalize the directory. + 1. If it does, we write a `metadata.final` file containing the + same information as the `metadata` file but with the complete + bit set to true. 1. If it does not, it means that someone else is concurrently writing the snapshot and we lost the race to them. We delete all data in the training run directory. From d941c33eb309d23ec79ba891e490da1abc1fb141 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 9 Jan 2020 17:04:48 -0800 Subject: [PATCH 04/11] Add clarification regarding number of workers being the same --- rfcs/20200107-tf-data-snapshot.md | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index f535eeb72..2d18d21e0 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -241,9 +241,17 @@ the MVP this is designed for: training dataset and be able to store that data on disk. Otherwise, a snapshot will never get created. -2. In case there are multiple workers and the dataset is sharded across - workers, we assume that the number of workers remains the same from one run - to another. If the number changes, we’ll trigger another snapshot. +2. In the cases where there are multiple workers and the dataset is sharded with + `Dataset.shard`, we assume that the number of workers remains the same from + the initial (writing) run through to the reading runs. + + If the number of workers change, then the `num_shards` parameter to + `Dataset.shard` will change, and this will result in a different graph + fingerprint and another snapshot write will be triggered. + + If all workers use the exact same input pipeline with no sharding (e.g. all + workers will read from all the files), then snapshot will still be able to + read from previous snapshots even if the number of workers is different. 3. Any `repeat`s in the dataset should be moved to after the `snapshot` op, to avoid writing large (or infinite) amounts of data during a snapshot writing From beff086b7c870c346e30d64562c370d0c80e795d Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Fri, 10 Jan 2020 15:52:40 -0800 Subject: [PATCH 05/11] Updated design doc by removing some unneeded parameters --- rfcs/20200107-tf-data-snapshot.md | 58 ++++++++++++------------------- 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index 2d18d21e0..272744986 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -70,14 +70,10 @@ We are proposing the following API for the snapshot transformation. ```python def snapshot(path, compression=None, - reader_path_prefix=None, - writer_path_prefix=None, shard_size_bytes=None, pending_snapshot_expiry_seconds=None, num_reader_threads=None, - reader_buffer_size=None, num_writer_threads=None, - writer_buffer_size=None, shuffle_on_read=None, shuffle_seed=None, mode=None, @@ -88,38 +84,26 @@ def snapshot(path, 1. `path`: Required. A directory where we want to save our snapshots and/or read from a previously saved snapshot. -2. `compression`: Optional. The type of compression to apply to the snapshot +1. `compression`: Optional. The type of compression to apply to the snapshot written to disk. This will support `GZIP`, `SNAPPY` or None. Defaults to AUTO. -3. `reader_path_prefix`: Optional. A prefix to add to the path when reading - from snapshots. This is useful for filesystems where configuration is passed - in through the path. Defaults to None. - -4. `writer_path_prefix`: Optional. A prefix to add to the path when writing to - snapshots. This is useful for filesystems where configuration is passed in - through the path. Defaults to None. - -5. `shard_size_bytes`: Optional. The maximum size of each data file to be +1. `shard_size_bytes`: Optional. The maximum size of each data file to be written by the snapshot dataset op. Defaults to AUTO. -6. `pending_snapshot_expiry_seconds`: Optional. How long to wait (in seconds) +1. `pending_snapshot_expiry_seconds`: Optional. How long to wait (in seconds) before the snapshot op considers a previously unfinished snapshot to be stale and starts writing a snapshot from scratch again. Defaults to 86400 seconds (1 day). -7. `num_reader_threads`: Optional. Number of threads to parallelize reading +1. `num_reader_threads`: Optional. Number of threads to parallelize reading from snapshot. Especially useful if compression is turned on since the decompression operation tends to be intensive. If > 1, then this might introduce non-determinism i.e. the order in which the elements are read from the snapshot are different from the order they're written. Defaults to AUTO. -8. `reader_buffer_size`: Optional. Maximum number of elements we can prefetch - reading from the snapshot. Increasing this might improve - performance but will increase memory consumption. Defaults to AUTO. - -9. `num_writer_threads`: Optional. Number of threads to parallelize writing +1. `num_writer_threads`: Optional. Number of threads to parallelize writing from snapshot. We'll open up `num_writer_threads` files and write to them in parallel. Especially useful if compression is turned on since the compression operation tends to be intensive. If > 1, then @@ -127,21 +111,17 @@ def snapshot(path, are read from the upstream iterator are different from the order they're written. Defaults to AUTO. -10. `writer_buffer_size`: Optional. Maximum number of pipeline elements to fill - up the buffer before writing them out using `num_writer_threads`. Defaults - to AUTO. - -11. `shuffle_on_read`: Optional. If this is True, then snapshot randomizes the +1. `shuffle_on_read`: Optional. If this is True, then snapshot randomizes the order in which the snapshot files are read back. This emulates shuffling of the input files during a training run (e.g. when `Dataset.list_files` is called with `shuffle` turned on). Defaults to False. -12. `shuffle_seed`: Optional. If shuffle_seed is set, the random number +1. `shuffle_seed`: Optional. If shuffle_seed is set, the random number generator used for shuffling (when `shuffle_on_read` is turned on) is seeded by the given seed. Otherwise, it is seeded by a random seed that differs for every run. -13. `mode`: Optional. The mode at which snapshot should operate. Valid options +1. `mode`: Optional. The mode at which snapshot should operate. Valid options are `auto`, `read`, `write`, and `passthrough`. The default mode is `auto`, where the snapshot op will automatically determine what mode to operate in. @@ -150,18 +130,18 @@ def snapshot(path, materialization currently exists. In other words, we enter the **WRITE** state immediately. - 2. `read` mode forces the snapshot transformation to read from the latest + 1. `read` mode forces the snapshot transformation to read from the latest version of the materialization on disk, regardless of whether the data stored on disk is complete and valid. In other words, we enter the **READ** state immediately. - 3. `passthrough` mode turns the snapshot transformation into a no-op. In + 1. `passthrough` mode turns the snapshot transformation into a no-op. In other words, we enter the **PASSTHROUGH** state immediately. - 4. `auto` retains the default behavior of snapshot. See the "Standard + 1. `auto` retains the default behavior of snapshot. See the "Standard Kernel Workflow" section for the default behavior. -14. `snapshot_name`: Optional. If set, use the supplied string as a named +1. `snapshot_name`: Optional. If set, use the supplied string as a named snapshot name instead of introspecting the data pipeline and automatically generating a unique identifier for the specific data pipeline. @@ -169,6 +149,15 @@ def snapshot(path, and `run_id` (see the _Detailed Design_ section for details), we will use the `snapshot_name` to uniquely identify the snapshot. + 1. Multiple concurrent training jobs with the same "snapshot_name" may + result in concurrent write collisions and a potentially invalid snapshot + if the jobs tries to read from and then write to the metadata file at + exactly the same time. + + The user is expected to handle these cases and explicitly specify `mode`s + to ensure that only one run is set to `write` mode at any point if + collisions are a possibility. + Note: `AUTO` options above indicates that snapshot will attempt to pick a reasonable default that is suitable for most use cases. We will eventually add tf.data autotuning to pick the right parameters for the best performance for @@ -195,10 +184,7 @@ select whether to train or preprocess on their own, which is not good. ### Performance Implications -* Do you expect any (speed / memory)? How will you confirm? -* There should be microbenchmarks. Are there? -* There should be end-to-end tests and benchmarks. If there are not (since - this is still a design), how will you track that these will be created? +Benchmarks for this feature will be included as part of Dataset microbenchmarks. ### Dependencies From a7a7c5b553cd7d7264d2f5730a368db5f709ad79 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 13 Jan 2020 16:59:06 -0800 Subject: [PATCH 06/11] Removed miscellaneous reader options and added a reader_fn parameter --- rfcs/20200107-tf-data-snapshot.md | 61 +++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index 272744986..2659333f8 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -72,10 +72,8 @@ def snapshot(path, compression=None, shard_size_bytes=None, pending_snapshot_expiry_seconds=None, - num_reader_threads=None, num_writer_threads=None, - shuffle_on_read=None, - shuffle_seed=None, + reader_fn=None, mode=None, snapshot_name=None): pass # Implementation goes here. @@ -96,13 +94,6 @@ def snapshot(path, stale and starts writing a snapshot from scratch again. Defaults to 86400 seconds (1 day). -1. `num_reader_threads`: Optional. Number of threads to parallelize reading - from snapshot. Especially useful if compression is turned on since the - decompression operation tends to be intensive. If > 1, then - this might introduce non-determinism i.e. the order in which the elements - are read from the snapshot are different from the order they're written. - Defaults to AUTO. - 1. `num_writer_threads`: Optional. Number of threads to parallelize writing from snapshot. We'll open up `num_writer_threads` files and write to them in parallel. Especially useful if compression is turned on since the @@ -111,15 +102,47 @@ def snapshot(path, are read from the upstream iterator are different from the order they're written. Defaults to AUTO. -1. `shuffle_on_read`: Optional. If this is True, then snapshot randomizes the - order in which the snapshot files are read back. This emulates shuffling - of the input files during a training run (e.g. when `Dataset.list_files` - is called with `shuffle` turned on). Defaults to False. - -1. `shuffle_seed`: Optional. If shuffle_seed is set, the random number - generator used for shuffling (when `shuffle_on_read` is turned on) is seeded - by the given seed. Otherwise, it is seeded by a random seed that differs for - every run. +1. `reader_fn`: Optional. A user provided reader function to use when reading + the snapshot back. This allows the user to specify the concurrency and + randomization required when reading from the snapshot. + + `reader_fn` should be a function that accepts two arguments: (1) a list of + snapshot file paths, and (2) a reference to a `SnapshotDataset` class. + The function should return a `Dataset` class. + + The `SnapshotDataset` class is a `Dataset` (similar to other source datasets + like `TFRecordDataset` or `TextLineDataset`) with the following constructor: + ```python + class SnapshotDataset(dataset_ops.DatasetSource): + def __init__(filenames): + """Creates a `SnapshotDataset`. + + Args: + filenames: A `tf.string` tensor or a `tf.data.Dataset` containing one or + more filenames. + """ + pass + ``` + + If the `reader_fn` is not specified, a default equivalent to the following + will be used: + ```python + def reader_fn(filenames, SnapshotDataset): + return SnapshotDataset(filenames) + ``` + + Users can optionally add snapshot file shuffling and parallelism by passing + a `reader_fn` similar to the one here: + ```python + def reader_fn(filenames, SnapshotDataset): + file_ds = Dataset.from_tensor_slices(filenames) + file_ds = file_ds.shuffle(1000) + reader_ds = dataset.interleave( + lambda x: SnapshotDataset(x), + cycle_length=32, + num_parallel_calls=32) + return reader_ds + ``` 1. `mode`: Optional. The mode at which snapshot should operate. Valid options are `auto`, `read`, `write`, and `passthrough`. The default mode is `auto`, From 149a6f9532900224bfb951e5a70a8e75d2e6ecb9 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 20 Jan 2020 21:35:18 -0800 Subject: [PATCH 07/11] Rename SnapshotDataset to SnapshotReaderDataset --- rfcs/20200107-tf-data-snapshot.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index 2659333f8..e370fae33 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -110,7 +110,7 @@ def snapshot(path, snapshot file paths, and (2) a reference to a `SnapshotDataset` class. The function should return a `Dataset` class. - The `SnapshotDataset` class is a `Dataset` (similar to other source datasets + The `SnapshotReaderDataset` class is a `Dataset` (similar to other source datasets like `TFRecordDataset` or `TextLineDataset`) with the following constructor: ```python class SnapshotDataset(dataset_ops.DatasetSource): From 0b28765eb58e03eeed229c1cca8b6309abeee184 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 27 Jan 2020 16:47:01 -0800 Subject: [PATCH 08/11] Revision to the API after TensorFlow Design Review --- rfcs/20200107-tf-data-snapshot.md | 169 +++++++++++++++--------------- 1 file changed, 84 insertions(+), 85 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index e370fae33..31868c326 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -70,12 +70,9 @@ We are proposing the following API for the snapshot transformation. ```python def snapshot(path, compression=None, - shard_size_bytes=None, - pending_snapshot_expiry_seconds=None, - num_writer_threads=None, reader_fn=None, - mode=None, - snapshot_name=None): + writer_fn=None, + pending_snapshot_expiry_seconds=None): pass # Implementation goes here. ``` @@ -86,105 +83,107 @@ def snapshot(path, written to disk. This will support `GZIP`, `SNAPPY` or None. Defaults to AUTO. -1. `shard_size_bytes`: Optional. The maximum size of each data file to be - written by the snapshot dataset op. Defaults to AUTO. +1. `reader_fn`: Optional. The input pipeline transformation specified by + `reader_fn` is executed when the snapshot detects that there is an existing, + valid snapshot available. -1. `pending_snapshot_expiry_seconds`: Optional. How long to wait (in seconds) - before the snapshot op considers a previously unfinished snapshot to be - stale and starts writing a snapshot from scratch again. Defaults to 86400 - seconds (1 day). - -1. `num_writer_threads`: Optional. Number of threads to parallelize writing - from snapshot. We'll open up `num_writer_threads` files and write to them in - parallel. Especially useful if compression is turned on since the - compression operation tends to be intensive. If > 1, then - this might introduce non-determinism i.e. the order in which the elements - are read from the upstream iterator are different from the order they're - written. Defaults to AUTO. - -1. `reader_fn`: Optional. A user provided reader function to use when reading - the snapshot back. This allows the user to specify the concurrency and - randomization required when reading from the snapshot. + `reader_fn` is a user specified function that accepts a single argument: + (1) a Dataset of Datasets, each representing a "splits" of elements of the + original dataset. The cardinality of the input dataset matches the + cardinality of the output of `writer_fn` (see below). The function should + return a Dataset of elements of the original dataset. - `reader_fn` should be a function that accepts two arguments: (1) a list of - snapshot file paths, and (2) a reference to a `SnapshotDataset` class. - The function should return a `Dataset` class. + A default `reader_fn` will look like the following: - The `SnapshotReaderDataset` class is a `Dataset` (similar to other source datasets - like `TFRecordDataset` or `TextLineDataset`) with the following constructor: ```python - class SnapshotDataset(dataset_ops.DatasetSource): - def __init__(filenames): - """Creates a `SnapshotDataset`. - - Args: - filenames: A `tf.string` tensor or a `tf.data.Dataset` containing one or - more filenames. - """ - pass + def default_reader_fn(datasets): + # shuffle the datasets splits + datasets = datasets.shuffle(NUM_DATASETS) + # read datasets in parallel and interleave their elements + return dataset.interleave(lambda x: x, num_parallel_calls=AUTOTUNE) ``` - If the `reader_fn` is not specified, a default equivalent to the following - will be used: - ```python - def reader_fn(filenames, SnapshotDataset): - return SnapshotDataset(filenames) - ``` +1. `writer_fn`: Optional. The input pipeline specified by `writer_fn` is + executed when the snapshot op detects that there are no valid snapshots + and no other threads are currently attempting to write a snapshot. + + `writer_fn` is a user specified function that accepts a single argument: + (1) a Dataset of elements to be written out. The function should return + a Dataset of Datasets, each representing "splits" of elements of the + original dataset. The tf.data snapshot implementation will then persist + splits in parallel. + + A default writer_fn will look like the following: - Users can optionally add snapshot file shuffling and parallelism by passing - a `reader_fn` similar to the one here: ```python - def reader_fn(filenames, SnapshotDataset): - file_ds = Dataset.from_tensor_slices(filenames) - file_ds = file_ds.shuffle(1000) - reader_ds = dataset.interleave( - lambda x: SnapshotDataset(x), - cycle_length=32, - num_parallel_calls=32) - return reader_ds + def default_writer_fn(dataset): + # add a component with element index + dataset = dataset.enumerate() + # split input dataset in a round-robin fashion + return dataset.split(num_splits=NUM_CORES, key_fn=lambda i, _: i % NUM_CORE ``` -1. `mode`: Optional. The mode at which snapshot should operate. Valid options - are `auto`, `read`, `write`, and `passthrough`. The default mode is `auto`, - where the snapshot op will automatically determine what mode to operate in. +1. `pending_snapshot_expiry_seconds`: Optional. How long to wait (in seconds) + before the snapshot op considers a previously unfinished snapshot to be + stale and starts writing a snapshot from scratch again. Defaults to 86400 + seconds (1 day). + +#### Achieving Parallelism - 1. `write` mode forces the snapshot transformation to write a new - materialization to disk, regardless of whether a complete and valid - materialization currently exists. In other words, we enter the **WRITE** - state immediately. +`reader_fn` and `writer_fn` will default to passing the dataset through unchanged +by default. In other words, the default implementation will result in +single-threaded reads and writes on snapshots. Parallelism can be achieved in +`writer_fn` by splitting up the dataset into multiple datasets, and using +`num_parallel_calls` in the `interleave` function of the `reader_fn`. - 1. `read` mode forces the snapshot transformation to read from the latest - version of the materialization on disk, regardless of whether the data - stored on disk is complete and valid. In other words, we enter the - **READ** state immediately. +#### Computing Graph Fingerprints - 1. `passthrough` mode turns the snapshot transformation into a no-op. In - other words, we enter the **PASSTHROUGH** state immediately. +Snapshot attempts to determine whether a run of an input pipeline is the same +as a previous run by computing the fingerprint of the nodes within the pipeline. - 1. `auto` retains the default behavior of snapshot. See the "Standard - Kernel Workflow" section for the default behavior. +However, some input pipelines might vary in insignificant ways from run to run +that causes the fingerprinting of them to differ. For instance, consider the +following preprocessing function: -1. `snapshot_name`: Optional. If set, use the supplied string as a named - snapshot name instead of introspecting the data pipeline and automatically - generating a unique identifier for the specific data pipeline. +```python +features_to_multiply = {"feature1", "feature2", "feature3", "feature4"} - 1. Instead of generating a new fingerprint of the input processing graph or - and `run_id` (see the _Detailed Design_ section for details), we will - use the `snapshot_name` to uniquely identify the snapshot. +def preprocessing_fn(value): + keys_to_features = { + "feature1": tf.FixedLenFeature([], tf.float32, 0.0), + "feature2": tf.FixedLenFeature([], tf.float32, 0.0), + "feature3": tf.FixedLenFeature([], tf.float32, 0.0), + "feature4": tf.FixedLenFeature([], tf.float32, 0.0) + } - 1. Multiple concurrent training jobs with the same "snapshot_name" may - result in concurrent write collisions and a potentially invalid snapshot - if the jobs tries to read from and then write to the metadata file at - exactly the same time. + parsed = tf.parse_single_example(value, keys_to_features) + combined_feature = 1.0 + for item in features_to_multiply: + combined_feature *= parsed[item] - The user is expected to handle these cases and explicitly specify `mode`s - to ensure that only one run is set to `write` mode at any point if - collisions are a possibility. + return combined_feature -Note: `AUTO` options above indicates that snapshot will attempt to pick a -reasonable default that is suitable for most use cases. We will eventually add -tf.data autotuning to pick the right parameters for the best performance for -individual workloads. +dataset = ... +dataset = dataset.map(preprocessing_fn) +``` + +In the above example, our `features_to_multiply` variable uses a `set`, which is +not guaranteed to be ordered in Python 2. When we iterate over the set in the +for loop within `preprocessing_fn`, we may get a different graph on each +run (i.e. one run could have us multiplying `feature2` first, then `feature4`, +etc..., while another run may have us multiplying `feature1`, then `feature3`, +and so on). + +In cases like these, we can ask fingerprinting to use a fixed value for the +fingerprint of the map function with a new `set_fingerprint` +transformation, which asks the fingerprinting function to not compute the +fingerprint of the previous node but to use a user-specified value instead: + +```python +dataset = ... +dataset = dataset.map(preprocessing_fn) +dataset = tf.data.set_fingerprint(dataset, fingerprint="my_fixed_fp") +``` ### External API Guarantees From 19f3a62cecbcf1c82a9dfbfc540e046aa9320505 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Jan 2020 13:51:47 -0800 Subject: [PATCH 09/11] Change fingerprint function to set_snapshot_fingerprint --- rfcs/20200107-tf-data-snapshot.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index 31868c326..b266be9e7 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -168,21 +168,22 @@ dataset = dataset.map(preprocessing_fn) ``` In the above example, our `features_to_multiply` variable uses a `set`, which is -not guaranteed to be ordered in Python 2. When we iterate over the set in the +not guaranteed to be ordered in Python. When we iterate over the set in the for loop within `preprocessing_fn`, we may get a different graph on each run (i.e. one run could have us multiplying `feature2` first, then `feature4`, etc..., while another run may have us multiplying `feature1`, then `feature3`, and so on). In cases like these, we can ask fingerprinting to use a fixed value for the -fingerprint of the map function with a new `set_fingerprint` +fingerprint of the map function with a new `set_snapshot_fingerprint` transformation, which asks the fingerprinting function to not compute the fingerprint of the previous node but to use a user-specified value instead: ```python dataset = ... dataset = dataset.map(preprocessing_fn) -dataset = tf.data.set_fingerprint(dataset, fingerprint="my_fixed_fp") +dataset = tf.data.experimental.set_snapshot_fingerprint( + dataset, fingerprint="my_fixed_fp") ``` ### External API Guarantees From 9a13bd427158d7da0b4d6e4c3300b00842eed5c5 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 28 Jan 2020 14:39:14 -0800 Subject: [PATCH 10/11] Update set_snapshot_fingerprint to with_snapshot_fingerprint --- rfcs/20200107-tf-data-snapshot.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index b266be9e7..e9b8af3b8 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -175,14 +175,14 @@ etc..., while another run may have us multiplying `feature1`, then `feature3`, and so on). In cases like these, we can ask fingerprinting to use a fixed value for the -fingerprint of the map function with a new `set_snapshot_fingerprint` +fingerprint of the map function with a new `with_snapshot_fingerprint` transformation, which asks the fingerprinting function to not compute the fingerprint of the previous node but to use a user-specified value instead: ```python dataset = ... dataset = dataset.map(preprocessing_fn) -dataset = tf.data.experimental.set_snapshot_fingerprint( +dataset = tf.data.experimental.with_snapshot_fingerprint( dataset, fingerprint="my_fixed_fp") ``` From 41396e8ce17b89d712a8bbda51c009ff8ca8475c Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 10 Feb 2020 16:49:42 -0800 Subject: [PATCH 11/11] Update status to accepted. --- rfcs/20200107-tf-data-snapshot.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rfcs/20200107-tf-data-snapshot.md b/rfcs/20200107-tf-data-snapshot.md index e9b8af3b8..a0f6c3358 100644 --- a/rfcs/20200107-tf-data-snapshot.md +++ b/rfcs/20200107-tf-data-snapshot.md @@ -1,12 +1,12 @@ # tf.data Snapshot -| Status | Proposed | +| Status | Accepted | | :------------ | :------------------------------------------------------ | | **RFC #** | [193](https://github.com/tensorflow/community/pull/193) | | **Author(s)** | Frank Chen (frankchn@google.com), Rohan Jain | | | (rohanj@google.com) | | **Sponsor** | Jiri Simsa (jsimsa@google.com) | -| **Updated** | 2020-01-07 | +| **Updated** | 2020-02-10 | ## Objective