# Building High Performance Data Pipelines with tf.Data and Google Cloud Storage

This article presents some recipes on how to build a high performance input pipeline using Tensorflow, specifically tf.data, and Google Cloud Storage.
The concepts and techniques are evolved from the slower technique to the fastest, considering the throughput rate from Google Cloud Storage to the training VM.

This article uses the Stanford Dogs Dataset with ~20000 images and 120 classes.

## Benchmark function

The benchmark will be the number of images ingested (read) per second from Cloud Storage to the virtual machine. There are several ways to implement this calculation, but a simple function was used to iterate through the dataset and measure the time.

The following code snippet ('timeit' function) from Tensorflow documentation [1] (as of 03/18/2020 - version 2.1) is used. Since tf.data.Dataset implements __iter__, it is possible to iterate on this data to observe the progression.


[1] https://www.tensorflow.org/tutorials/load_data/images#performance

In [1]:
# Benchmark function for dataset
import time
default_timeit_steps = 1000

def timeit(ds, steps=default_timeit_steps):
    start = time.time()
    it = iter(ds)
    
    for i in range(steps):
        batch = next(it)
        
        if i%10 == 0:
            print('.',end='')
    print()
    end = time.time()

    duration = end-start
    print("{} batches: {} s".format(steps, duration))
    print("{:0.5f} Images/s".format(BATCH_SIZE*steps/duration))

## Let's create the Dataset using tf.data

In this case, all the data is located in a bucket from Cloud Storage (gs://).

From the path of objects the label is infered and the images downloaded.

In [2]:
# First let's import Tensorflow
import tensorflow as tf

In [3]:
# Now import some additional libraries
from numpy import zeros
import numpy as np
from datetime import datetime

In [4]:
# Global variables
FILENAMES = 'gs://tf-data-pipeline/*/*'
FOLDERS = 'gs://tf-data-pipeline/*'

RESOLUTION = (224,224)
NUM_TOTAL_IMAGES = 20583
IMG_SHAPE=(224,224,3)
BATCH_SIZE = 32

AUTOTUNE = tf.data.experimental.AUTOTUNE

In [5]:
# Get labels from folders name
def get_label_map(path):
    #list folders in this path
    folders_name = tf.io.gfile.glob(path)

    labels = []
    for folder in folders_name:
        labels.append(folder.split(sep='/')[-1])

    # Generate a Label Map
    label_map = {labels[i]:i for i in range(len(labels))}
    inv_label_map = {i:labels[i] for i in range(len(labels))}
    
    return label_map, inv_label_map

In [6]:
# Function to One hot encode the inputs
def one_hot_encode(label_map, filepath):
    dataset = dict()
    
    for i in range(len(filepath)):
        encoding = zeros(len(label_map), dtype='uint8')
        encoding[label_map[filepath[i].split(sep='/')[-2]]] = 1
        
        dataset.update({filepath[i]:list(encoding)})
    
    return dataset

In [7]:
label_map, inv_label_map = get_label_map(FOLDERS)

In [8]:
# List all files in bucket
filepath = tf.io.gfile.glob(FILENAMES)
NUM_TOTAL_IMAGES = len(filepath)

In [9]:
dataset = one_hot_encode(label_map, filepath)
dataset = [[k,v] for k,v in dataset.items()]

features = [i[0] for i in dataset]
labels = [i[1] for i in dataset]

In [10]:
# Create Dataset from Features and Labels
dataset = tf.data.Dataset.from_tensor_slices((features, labels))

In [11]:
# Function to download bytes from Cloud Storage
def get_bytes_label(filepath, label):
    raw_bytes = tf.io.read_file(filepath)
    return raw_bytes, label

In [23]:
# Preprocess Image
def process_image(raw_bytes, label):
    image = tf.io.decode_jpeg(raw_bytes, channels=3)
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    image = tf.image.resize(image, (224,224))
    
    return image, label

In [24]:
def build_dataset(dataset, batch_size=BATCH_SIZE, cache=False):
    
    if cache:
        if isinstance(cache, str):
            dataset = dataset.cache(cache)
        else:
            dataset = dataset.cache()
    
    dataset = dataset.shuffle(NUM_TOTAL_IMAGES)
    
    # Extraction: IO Intensive
    dataset = dataset.map(get_bytes_label, num_parallel_calls=AUTOTUNE)

    # Transformation: CPU Intensive
    dataset = dataset.map(process_image, num_parallel_calls=AUTOTUNE)
    dataset = dataset.repeat()
    dataset = dataset.batch(batch_size=batch_size)
    
    # Pipeline next iteration
    dataset = dataset.prefetch(buffer_size=AUTOTUNE)
    
    return dataset

In [25]:
train_ds = build_dataset(dataset)

## First Attemp: No cache, no tricks

In this first attemp no cache was used and the images were read one by one from the bucket.

The biggest problem here is to read 1000's of files one by one. This can really slow down the process.

In [None]:
timeit(train_ds, steps=100)

## Ok, let's put some local cache in action

Tf.data.Dataset implements a cache function. 

If no parameter is passad to the cache, it uses the memory of the host to cache all the data. The problem is if your dataset is bigger than your host memory and you can't cache the Epoch in memory. In this case the cache won't help and we still have a bottleneck.

First let's test the throughput using cache in memory and than in as a local file.

In [None]:
# Memory
train_cache_ds = build_dataset(dataset, cache=True)
timeit(train_cache_ds)

In [None]:
train_local_cache_ds = build_dataset(dataset, cache='./dog.tfcache')
timeit(train_local_cache_ds)

### Hum ...

Ok, but no difference?

In this case, even using memory and local cache, the host VM is not able to fetch more data, mailly because of the amount of small files.

To solve this problem we can follow some best practices for designing performant TensorFlow input pipelines (from the Tensorflow documentation [1]):

 - Use the prefetch transformation to overlap the work of a producer and consumer.
 - Parallelize the data reading transformation using the interleave transformation.
 - Parallelize the map transformation by setting the num_parallel_calls argument.
 - Use the cache transformation to cache data in memory during the first epoch
 - Vectorize user-defined functions passed in to the map transformation
 - Reduce memory usage when applying the interleave, prefetch, and shuffle transformations.
 
But before we continue, let's do some tracing to understand what is going on.

[1] https://www.tensorflow.org/guide/data_performance


In [None]:
tf.summary.trace_off()
tf.summary.trace_on(graph=False, profiler=True)

train_ds = build_dataset(dataset)
timeit(train_ds, steps=100)

tf.summary.trace_export('Data Pipeline', profiler_outdir='/home/jupyter/tensorflow-data-pipeline/logs/')

In [None]:
# Load the TensorBoard notebook extension.
%load_ext tensorboard

In [None]:
%tensorboard --logdir=/home/jupyter/tensorflow-data-pipeline/logs

From this graph we can infer that the number of threads created is equal to the number of CPUs of my host VM.

Even increasing the 'num_parallel_calls' the performance is the same.

The next step is to bundle together all the images in a TFRecord file, so let's do it.

## Using TF.Record for speedup de reading process

This test covers 2 parts:
 - Store images without pre-processing in TFRecord
 - Store images with cached pre-processing in TFRecord
 
Helper functions to convert from values (float, int, etc.) to tf.train.features.

In [83]:
def tf_serialize_example(image, label):
    
    def _bytes_feature(value):
        """Returns a bytes_list from a string / byte."""
        if isinstance(value, type(tf.constant(0))):
            value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

    def _float_feature(value):
        """Returns a float_list from a float / double."""
        return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

    def _int64_feature(value):
        """Returns an int64_list from a bool / enum / int / uint."""
        return tf.train.Feature(int64_list=tf.train.Int64List(value=value[0]))    
    
    def serialize_example(image, label):
        
        feature = {
            'image': _bytes_feature(tf.io.serialize_tensor(image)),
            'label': _int64_feature(label)
        }

        example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
        
        return example_proto.SerializeToString()
    
#     tf_string = tf.py_function(
#         serialize_example,
#         (image, label),  # pass these args to the above function.
#         tf.string)      # the return type is `tf.string`.

    tf_string = serialize_example(image, label)

    return tf_string #tf.reshape(tf_string, ()) # The result is a scalar

In [84]:
# Create a description of the features.
feature_description = {
    'image': tf.io.FixedLenFeature([], tf.string),
    'label': tf.io.FixedLenSequenceFeature([], tf.int64, allow_missing=True)
}

def _parse_function(example_proto):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

In [76]:
serialized_ds = train_ds.map(tf_serialize_example)

AttributeError: in converted code:

    <ipython-input-74-e3046c9900fe>:6 _bytes_feature  *
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    /tmp/tmpaj7i72pb.py:65 serialize_example
        feature = {'image': ag__.converted_call(_bytes_feature, (ag__.converted_call(tf.io.serialize_tensor, (image,), None, fscope_4),), None, fscope_4), 'label': ag__.converted_call(_int64_feature, (label,), None, fscope_4)}
    /tmp/tmpaj7i72pb.py:32 _bytes_feature
        value = ag__.if_stmt(cond, if_true, if_false, get_state, set_state, ('value',), ())
    /opt/conda/lib/python3.7/site-packages/tensorflow_core/python/autograph/operators/control_flow.py:920 if_stmt
        return _py_if_stmt(cond, body, orelse)
    /opt/conda/lib/python3.7/site-packages/tensorflow_core/python/autograph/operators/control_flow.py:1029 _py_if_stmt
        return body() if cond else orelse()
    /tmp/tmpaj7i72pb.py:26 if_true
        value_1 = ag__.converted_call(value_1.numpy, (), None, fscope_1)

    AttributeError: 'Tensor' object has no attribute 'numpy'


In [36]:
# Create TFRecord with ~9000
def create_tfrecord(ds, n_shards):
    shard_range = round(NUM_TOTAL_IMAGES/n_shards)
    
    for i in range(n_shards):
        shard = ds.shard(n_shards, i)
        batch = map(lambda x: tf_serialize_example(x[0],x[1]), list(train_ds.shard(n_shards,i).as_numpy_iterator()))
        
        with tf.io.TFRecordWriter(f'output_file-part-{i}.tfrecord') as writer:
            for _ in range(shard_range):
                batch = tf_serialize_example(next(it))
                writer.write(batch)

In [85]:
renato = map(lambda x: tf_serialize_example(x[0],x[1]), list(train_ds.take(5).shard(5,1).as_numpy_iterator()))

In [37]:
create_tfrecord(train_ds.take(5), 1)

TypeError: 'function' object is not iterable

In [32]:
renato

(<tf.Tensor: shape=(32, 224, 224, 3), dtype=float32, numpy=
 array([[[[1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          ...,
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00]],
 
         [[1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          ...,
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00]],
 
         [[1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
          [1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
    

In [None]:
renato = 