import tensorflow as tf

import time

Start with defining a class inheriting from tf.data.Dataset called ArtificialDataset. This dataset:

- Generates num_samples samples (default is 3)
- Sleeps for some time before the first item to simulate opening a file
- Sleeps for some time before producing each item to simulate reading data from a file



In [2]:
class ArtificialDataset(tf.data.Dataset):
    def _generator(num_samples):
        # Opening the file
        time.sleep(0.03)

        for sample_idx in range(num_samples):
            # Reading data (line, record) from the file
            time.sleep(0.015)

            yield (sample_idx,)

    def __new__(cls, num_samples=3):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
            args=(num_samples,)
        )

Next, write a dummy training loop that measures how long it takes to iterate over a dataset. Training time is simulated.

In [3]:
def benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for epoch_num in range(num_epochs):
        for sample in dataset:
            # Performing a training step
            time.sleep(0.01)
    print("Execution time:", time.perf_counter() - start_time)

Start with a naive pipeline using no tricks, iterating over the dataset as-is.

In [5]:
benchmark(ArtificialDataset())

Execution time: 0.27612568624317646


However, in a naive synchronous implementation like here, while your pipeline is fetching the data, your model is sitting idle. Conversely, while your model is training, the input pipeline is sitting idle. The training step time is thus the sum of opening, reading and training times.

__Prefetching__

Prefetching overlaps the preprocessing and model execution of a training step. While the model is executing training step s, the input pipeline is reading the data for step s+1. Doing so reduces the step time to the maximum (as opposed to the sum) of the training and the time it takes to extract the data.

The tf.data API provides the tf.data.Dataset.prefetch transformation. It can be used to decouple the time when data is produced from the time when data is consumed. In particular, the transformation uses a background thread and an internal buffer to prefetch elements from the input dataset ahead of the time they are requested. The number of elements to prefetch should be equal to (or possibly greater than) the number of batches consumed by a single training step. You could either manually tune this value, or set it to tf.data.AUTOTUNE, which will prompt the tf.data runtime to tune the value dynamically at runtime.

Note that the prefetch transformation provides benefits any time there is an opportunity to overlap the work of a "producer" with the work of a "consumer."

In [6]:
# Try prefetch
benchmark(
    ArtificialDataset()
    .prefetch(tf.data.AUTOTUNE)
)

Execution time: 0.23265091236680746


__Interleave__<br>
To mitigate the impact of the various data extraction overheads, the tf.data.Dataset.interleave transformation can be used to parallelize the data loading step, interleaving the contents of other datasets (such as data file readers). The number of datasets to overlap can be specified by the cycle_length argument, while the level of parallelism can be specified by the num_parallel_calls argument. Similar to the prefetch transformation, the interleave transformation supports tf.data.AUTOTUNE, which will delegate the decision about what level of parallelism to use to the tf.data runtime.

__Sequential interleave__<br>
The default arguments of the tf.data.Dataset.interleave transformation make it interleave single samples from two datasets sequentially.

In [7]:
# Interleave: Sequential

benchmark(
    tf.data.Dataset.range(2)
    .interleave(lambda _: ArtificialDataset())
)

Execution time: 0.5298015046864748


__Parallel interleave__<br>
Now, use the num_parallel_calls argument of the interleave transformation. This loads multiple datasets in parallel, reducing the time waiting for the files to be opened

In [8]:
# Interleave: Parallelizing data extraction

benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        lambda _: ArtificialDataset(),
        num_parallel_calls=tf.data.AUTOTUNE
    )
)

Execution time: 0.3163652131333947


__Parallelizing data transformation__<br>
When preparing data, input elements may need to be pre-processed. To this end, the tf.data API offers the tf.data.Dataset.map transformation, which applies a user-defined function to each element of the input dataset. Because input elements are independent of one another, the pre-processing can be parallelized across multiple CPU cores. To make this possible, similarly to the prefetch and interleave transformations, the map transformation provides the num_parallel_calls argument to specify the level of parallelism.

Choosing the best value for the num_parallel_calls argument depends on your hardware, characteristics of your training data (such as its size and shape), the cost of your map function, and what other processing is happening on the CPU at the same time. A simple heuristic is to use the number of available CPU cores. However, as for the prefetch and interleave transformation, the map transformation supports tf.data.AUTOTUNE which will delegate the decision about what level of parallelism to use to the tf.data runtime.

In [9]:
def mapped_function(s):
    # Do some hard pre-processing
    tf.py_function(lambda: time.sleep(0.03), [], ())
    return s

__Sequential mapping__<br>
Start by using the map transformation without parallelism as a baseline example.

In [13]:
# Sequential data transformation
benchmark(
    ArtificialDataset()
    .map(mapped_function)
)

Execution time: 0.45774794556200504


__Parallel mapping__<br>
Now, use the same pre-processing function but apply it in parallel on multiple samples.

In [11]:
# parallel data transformation
benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.AUTOTUNE
    )
)

Execution time: 0.29934894014149904


__Caching__<br>
The tf.data.Dataset.cache transformation can cache a dataset, either in memory or on local storage. This will save some operations (like file opening and data reading) from being executed during each epoch.

In [12]:
# Caching
benchmark(
    ArtificialDataset()
    .map(  # Apply time consuming operations before cache
        mapped_function
    ).cache(
    ),
    5
)

Execution time: 0.37716342508792877


If the user-defined function passed into the map transformation is expensive, apply the cache transformation after the map transformation as long as the resulting dataset can still fit into memory or local storage. If the user-defined function increases the space required to store the dataset beyond the cache capacity, either apply it after the cache transformation or consider pre-processing your data before your training job to reduce resource usage.

__Vectorizing mapping__<br>
Invoking a user-defined function passed into the map transformation has overhead related to scheduling and executing the user-defined function. Vectorize the user-defined function (that is, have it operate over a batch of inputs at once) and apply the batch transformation before the map transformation.

To illustrate this good practice, your artificial dataset is not suitable. The scheduling delay is around 10 microseconds (10e-6 seconds), far less than the tens of milliseconds used in the ArtificialDataset, and thus its impact is hard to see.

For this example, use the base tf.data.Dataset.range function and simplify the training loop to its simplest form.

In [14]:
fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
    start_time = time.perf_counter()
    for _ in tf.data.Dataset.range(num_epochs):
        for _ in dataset:
            pass
    tf.print("Execution time:", time.perf_counter() - start_time)

def increment(x):
    return x+1

In [15]:
# Scalar mapping
fast_benchmark(
    fast_dataset
    # Apply function one item at a time
    .map(increment)
    # Batch
    .batch(256)
)

Execution time: 0.25147498585283756


In [16]:
# Vectorized mapping
fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handle batches
    .map(increment)
)

Execution time: 0.03382870554924011


__Reducing memory footprint__<br>
A number of transformations, including interleave, prefetch, and shuffle, maintain an internal buffer of elements. If the user-defined function passed into the map transformation changes the size of the elements, then the ordering of the map transformation and the transformations that buffer elements affects the memory usage. In general, choose the order that results in lower memory footprint, unless different ordering is desirable for performance.

__Caching partial computations__<br>
It is recommended to cache the dataset after the map transformation except if this transformation makes the data too big to fit in memory. A trade-off can be achieved if your mapped function can be split in two parts: a time consuming one and a memory consuming part. In this case, you can chain your transformations like below:

__Best practice summary__<br>
Here is a summary of the best practices for designing performant TensorFlow input pipelines:

- Use the prefetch transformation to overlap the work of a producer and consumer
- Parallelize the data reading transformation using the interleave transformation
- Parallelize the map transformation by setting the num_parallel_calls argument
- Use the cache transformation to cache data in memory during the first epoch
- Vectorize user-defined functions passed in to the map transformation
- Reduce memory usage when applying the interleave, prefetch, and shuffle transformations