In [0]:
%tensorflow_version 2.x
import tensorflow as tf
import time

Making reproducible performance benchmarks can be difficults, different factors impacting it:
- the current CPU load,
- the network traffic,
- complex mechanisms like cache, etc.
Hence, provide a reproducible benchmark, build an artificial example.


Define a class inheriting from `tf.data.Dataset` called `ArtificialDataset`,
This dataset:
- Generates `num_samples` samples (default is 3)
- Sleeps for some time before the first item to simulate opening a file
- Sleeps for some time before producing each item to simulate reading data from a file

In [0]:
class ArtificialDataset(tf.data.Dataset):
  def _generator(num_samples):
    # Opening the file
    time.sleep(0.03)

    for sample_idx in range(num_samples):
      # Reading data (line, record) from the file
      time.sleep(0.015)

      yield(sample_idx, )
  
  def __new__(cls, num_samples=3):
    return tf.data.Dataset.from_generator(
        cls._generator,
        output_types=tf.dtypes.int64,
        output_shapes=(1, ),
        args=(num_samples, ))

In [0]:
def benchmark(dataset, num_epochs=2):
  """Write a dummy training loop that measures how long it takes to iterate over a dataset.
  Training time is simulated."""
  start_time = time.perf_counter()
  for epoch_num in range(num_epochs):
    for sample in dataset:
      # Performing a training step
      time.sleep(0.01)
  tf.print('Execution time:', time.perf_counter() - start_time)

## Optimize performance
To exhibit how performance can be optimized, you will improve the performance of the `ArtificialDataset`.

### The naive approach
Start with a naive pipeline using no tricks, iterating over the dataset as-is.

In [0]:
benchmark(ArtificialDataset())

### Prefetching
Prefetching overlaps the processing and model execution of a training step. While the model is executing training step `s`, the input pipeline is reading the data for step `s+1`. Doing so reduces the step time to the maximum (as opposed to the sum) of the training and the time it takes to extract the data.

The `tf.data` API provides the `tf.data.Dataset.prefetch` transformation. It can be used to decouple the time when data is produced from the time when data is consumed. In particular, the transformation uses a background thread and an internal buffer to prefetch elements from the input dataset ahead of the time they are requested. The number of elements to prefetch should be equal to (or possibly greater than) the number of batches consumed by a single training step. You could either manually tune this value, or set it to `tf.data.experimental.AUTOTUNE` which will prompt the `tf.data` runtime to tune the value dynamically at runtime.

Note that the prefetch transformation provides benefits any time there is opportunity to overlap the work of a "producer" with the work of a "consumer"

In [0]:
benchmark(
    ArtificialDataset()
    .prefetch(tf.data.experimental.AUTOTUNE))

### Parallelizing data extraction
In a real-world setting, the input data may be stored remotely (for example, GCS or HDFS). A dataset pipeline that works well when reading data locally might become bottlenecked on I/O when reading data remotely because of the following differences between local and remote storate:
- **Time-to-first-byte**: Reading the first byte of a file from remote storate can take orders of magnitude longer than from local storate.
- **Read throughput**: While remote storate typically offers large aggregate bandwidth, reading a single file might only be able to utilize a small fraction of this bandwidth.


### Sequential interleave
The default arguments of the `tf.data.Dataset.interleave` transformation make it interleave single samples from two datasets sequentially.

In [0]:
# Feching samples alternatively from the two datasets available.
# No performance improvement is involved here.
benchmark(
    tf.data.Dataset.range(2)
    .interleave(ArtificialDataset)
)

### Parallel interleave
=

In [0]:
# Now use the `num_parallel_calls` argument of the `interleave` transformation. This loads
# multiple datasets in parallel, reducing the time waiting for the files to be opened.
benchmark(
    tf.data.Dataset.range(2)
    .interleave(
        ArtificialDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
)

### Parallelizing data transformation
When preparing data, input elements may need to be pre-processed. To this end, the `tf.data` API offers the `tf.data.Dataset.map` transformation, which applies a user-defined function to each element of the input dataset. Because input elements are independent of one another, the pre-processing can be parallelized across multiple CPI cores. To make this possible, similarly to the `prefetch` and `interleave` transformations, the `map` transformation provides the `num_parallel_calls` argument to specify the level of parallelism.

Choosing the best value for the `num_parallel_calls` argument depends on your hardware, characteristics of your training data (such as its size and shape), the cost of your map function, and what other processing is happening on the CPU at the same time. A simple heuristic is to use the number of available CPU cores. However, as for the `prefetch` and `interleave` transformation supports, `tf.data.experimental.AUTOTUNE` which will delegate the decision about what level of parallelism to use to the `tf.data` runtime.

In [0]:
def mapped_function(s):
  # Do some hard pre-processing
  tf.py_function(lambda: time.sleep(0.03), [], ())
  return s

In [0]:
# Sequential mapping
# Start by using the `map` transformation without parallelism as a baseline example
benchmark(
    ArtificialDataset()
    .map(mapped_function)
)

In [0]:
# Parallel mapping
# Now, use the same pre-processing function but apply it in parallel on multiple samples.
benchmark(
    ArtificialDataset()
    .map(
        mapped_function,
        num_parallel_calls=tf.data.experimental.AUTOTUNE)
    )

### Caching
The `tf.data.Dataset.cache` transformation can cache a dataset, either in memory or on local storate. This will save some operations (like file opening and data reading) from being executed during each epoch

In [0]:
benchmark(
    ArtificialDataset()
    .map( # Apply Time consuming operations before cache
      mapped_function
    ).cache(),
    5
)

When you cache a dataset, the transformations before the `cache` one (like the file opening and data reading) are executed only during first epoch. The next epochs will reuse the data cached by the `cache` transformation.

If the user-defined function passed into the `map` transformation is expensive, apply the `cache` transformation after the `map` transformation as long as the resulting dataset can still fit into memory or local storage. If the user-defined function increases the space required to store the dataset beyond the cache capacity, either apply it after the `cache` transformation or consider pre-processing your data before your training job to reduce resource useage.

### Vectorizing mapping
Invoking a user-defined function passed into the `map` transformation has overhead related to scheduling and executing the user-defined function. We recommend vetorizing the user-defined funciton (that is, have it operate over a batch of inputs at once) and apply the batch transformation before the `map` transformation.

To illustrate this good practice, your artificial dataset is not suitable. The scheduling delay is around 10 microseconds, far less than the the tens of miliseconds used int `ArtificialDataset`, and thus its impacti is hard to see.

In [0]:
# For this example, use the base `tf.data.Dataset.range` functoin
# and simplify the training loop to its simplest form.
fast_dataset = tf.data.Dataset.range(10000)

def fast_benchmark(dataset, num_epochs=2):
  start_time = time.perf_counter()
  for _ in tf.data.Dataset.range(num_epochs):
    for _ in dataset:
      pass
  tf.print('Execution time:', time.perf_counter() - start_time)

def increment(x):
  return x + 1

In [0]:
fast_benchmark(fast_dataset
              .map(increment) # Apply function one item at a time
              .batch(256) # Batch
               )

In [0]:
# Vectorized mapping
fast_benchmark(
    fast_dataset
    .batch(256)
    # Apply function on a batch of items
    # The tf.Tensor.__add__ method already handled batches
    .map(increment)
)

### Reducing memory footprint
A number of transformations, including `interleave`, `prefetch`, and `shuffle`, maintains an internal buffer of elements. If the user-defined function passed into the `map` transformation changes the size of elements, then the ordering of the map transformation and the transformations that buffer elements affects the memory usage. In general, we recommend choosing the order that results in lower memory footprint, unless different ordering is desirable for performance.

### Caching partial computations
It is recommended to cache the dataset after `map` transformation except if this transformation makes the data too big to fit in memory. A trade-off can be achived if your mapped function can be split in two parts: a time consuming one and a memory consuming part. In this case, you can chian your transformation like blow:

In [0]:
# dataset.map(time_consuming_mapping).cahce().map(memory_consuming_mapping)

This way, the time consuming part is only executed during the first epoch, and you avoid using too much cache space.

# Best practice summary
- Use the `prefetch` transformation to overlap the work of a producer and consumer.
- Parallelize the data reading transformation using the `interleave` transformation
- Parallelize the `map` transformation by setting the `num_parallel_calls` argument.
- Use the `cache` transformation to cache data in memory during the first epoch
- Vectorize user-defined functions passed in to the `map` transformation
- Reduce memory usage when applying the `interleave`, `prefetch`, and `shuffle` transformations.