# Speed up your training with a proper input pipeline

**DISCLAIMER**: the input pipeline API are a not very straightforward and the TF team has already announced they will be rewritten from scratch.



In [1]:
import os
import time
import shutil
import tensorflow as tf

def _prepare_graph():
    tf.reset_default_graph()
    tf.set_random_seed(23)
_prepare_graph()

There are three ways to push actual data into a tensorflow graph:
1. ~~load all the data in memory (which works only for small datasets)~~
2. feeding data at each step
3. reading from files and using Queues  
  
We will compare these two approaches using the same example we used in section 04 where a classifier emits 1 if the sum of all the input elements is positive, 0 otherwise.

In [2]:
class Model(object):
    
    def __init__(self, inputs, targets):
        self._gs = tf.Variable(0, name='global_step', trainable=False, dtype=tf.int32)

        self._inputs = inputs
        self._targets = targets
        
        with tf.variable_scope('Layer1') as scope:
            w1 = tf.get_variable('w', shape=[1024, 1024])
            b1 = tf.get_variable('b', shape=[1024])
            z1 = tf.matmul(self._inputs, w1) + b1
            y1 = tf.nn.relu(z1, name='activation')

        with tf.variable_scope('Layer2') as scope:
            w2 = tf.get_variable('w', shape=[1024, 1])
            b2 = tf.get_variable('b', shape=[1])
            logits = tf.matmul(y1, w2) + b2
            self._predicts = tf.cast(logits > 0, tf.int32)

        with tf.variable_scope('Loss'):
            labels = tf.cast(self._targets, tf.float32)
            losses = tf.nn.sigmoid_cross_entropy_with_logits(
                labels=labels, logits=logits)
            self._loss_op = tf.reduce_mean(losses, name='loss')

        with tf.variable_scope('BackProp'):
            optimizer = tf.train.GradientDescentOptimizer(0.1)
            trainable_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES)
            grads_and_vars = optimizer.compute_gradients(
                loss=self._loss_op, var_list=trainable_vars)
            self._train_op = optimizer.apply_gradients(
                grads_and_vars=grads_and_vars, 
                global_step=self._gs,
                name="train_op")
            
        with tf.variable_scope('Accuracy'):
            self._accuracy_op = tf.reduce_mean(
                tf.cast(tf.equal(self._predicts, self._targets), tf.float32),
                name='accuracy')
            
            
    @property
    def global_step(self):
        return self._gs

    @property
    def inputs(self):
        return self._inputs
    
    @property
    def targets(self):
        return self._targets
    
    @property
    def predictions(self):
        return self._predicts
    
    @property
    def loss_op(self):
        return self._loss_op
    
    @property
    def train_op(self):
        return self._train_op
    
    @property
    def accuracy_op(self):
        return self._accuracy_op

## The naive way: feeding data step by step
The first and more straightfowrard way to push data into a computational graph is to feed actual values into placeholders step by step. This is achievable through the `feed_dict` argument of the `tf.Session.run()` method. A `feed_dict` is a dictionary where keys are TF placeholders and values are `numpy` arrays. So let's build our model with placeholders as inputs:

In [3]:
_prepare_graph()
with tf.variable_scope('Input') as scope:
    inputs = tf.placeholder(dtype=tf.float32, shape=[None, 1024])
    targets = tf.placeholder(dtype=tf.int32, shape=[None, 1])
model = Model(inputs, targets)

Then we run out training, generating ranom examples at each training step and feeding them into the graph via the `feed_dict` argument.

In [4]:
def get_batch_tensors(batch_size=128):
    data = tf.random_normal([batch_size, 1024], mean=0, stddev=1)
    labels = tf.cast(tf.reduce_sum(data, axis=1, keep_dims=True) > 0, tf.int32)
    return data, labels

STEPS = 100
CHECK_EVERY = 10

startTime = time.time()
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for i in range(STEPS):
        actual_inputs, actual_targets = sess.run(list(get_batch_tensors(256)))
        fetches = [model.train_op]
        if i % CHECK_EVERY == 0:
            fetches = fetches + [model.loss_op, model.accuracy_op]
        results = sess.run(
             fetches=fetches,
             feed_dict={
                 model.inputs: actual_inputs,
                 model.targets: actual_targets
             })
        if len(results) > 1:
            print('iter:%d - loss:%f - accuracy:%f' % (i, results[1], results[2]))
print("Time taken: %f" % (time.time() - startTime))

iter:0 - loss:0.883482 - accuracy:0.507812
iter:10 - loss:1.639438 - accuracy:0.503906
iter:20 - loss:1.340737 - accuracy:0.527344
iter:30 - loss:1.707782 - accuracy:0.472656
iter:40 - loss:0.821126 - accuracy:0.617188
iter:50 - loss:0.732072 - accuracy:0.667969
iter:60 - loss:0.714731 - accuracy:0.671875
iter:70 - loss:0.540966 - accuracy:0.722656
iter:80 - loss:0.320460 - accuracy:0.843750
iter:90 - loss:0.310457 - accuracy:0.855469
Time taken: 6.120873


In this scenario:
* everything is synchronous and single-threaded
* we keep moving back and forth, between Python and the underlying C++ wrapper and between the CPU and the GPU.

(*Aside, if you run a `nvidia-smi` while running this example, you would see a low usage of the GPU*)

## Speedig up your training with Queues
Queues are a powerful mechanism for asynchronous computation using TF. Like everything in TF, a queue is a node in a graph. It's a stateful node, like a variable: other nodes can modify its content. In particular, nodes can enqueue new items in to the queue, or dequeue existing items from the queue.  
  
To use a queue in our model, we have to rewrite the input creation. We will:
* crete a `FIFOQueue` object
* enqueue a pair of batch of tensor (inputs, features) in it
* create a `QueueRunner` which will be on duty of running the queue across different threads
* add the queue runner to the `QUEUE_RUNNERS` graph collection
* dequeue the (inputs, features) batches from the queue
* feed such tensors as inputs to our model

In [5]:
_prepare_graph()

NUM_THREADS = 2
FIFOQUEUE_NAME = 'FIFOQueue'

with tf.variable_scope('Input') as scope:
    inputs, targets = get_batch_tensors(128)
    queue = tf.FIFOQueue(capacity=5, dtypes=[tf.float32, tf.int32], name=FIFOQUEUE_NAME)
    enqueue_op = queue.enqueue((inputs, targets))
    queue_runner = tf.train.QueueRunner(queue, [enqueue_op] * NUM_THREADS)
    tf.train.add_queue_runner(queue_runner)
    inputs, targets = queue.dequeue()
model = Model(inputs, targets)

The instruction `tf.train.add_queue_runner(queue_runner)` adds the queue runner of the queue (with name "FIFOQueue") in the `QUEUE_RUNNERS` collection of the graph. We can inspect such collection to verify.  

In [6]:
assert tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)[0].name == 'Input/' + FIFOQUEUE_NAME

To run a graph with queues, we have to perform some more operation that the usual. We will get through them using the `tf.InteractiveSession` which is just a regular session that can support interactive environments (like in notebooks or IDLE). First, as usual, we have to create the session and run the graph initialization:

In [7]:
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
print('Session initiaized. The global step is: %d' % sess.run(model.global_step))

Session initiaized. The global step is: 0


The queue runners will start threads that run the input pipeline, filling the example queue so that the dequeue to get the examples will succeed. To coordinate all the runners, we can use a `tf.Coordinator` object to control them. We create a coordinator instance and pass it as an argument to the `tf.train.start_queue_runners` function that will return a list of `Thread` object -- typically in number greater than the selected number of threads for coordination issues.
  
**NOTA BENE**: the queue runners must be started **before** any train/inference operation that, otherwise, will hang indefinetly. 

In [8]:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
print('Queue runners started, %d threads created.' % len(threads))

Queue runners started, 3 threads created.


Now we can run the regular training (for a smaller amount of times, just to run something).

In [9]:
STEPS = 5
CHECK_EVERY = 5
for i in range(STEPS):        
    fetches = [model.train_op]
    if i % CHECK_EVERY == 0:
        fetches = fetches + [model.loss_op, model.accuracy_op]
    results = sess.run(fetches)
    if len(results) > 1:
        print('iter:%d - loss:%f - accuracy:%f' % (i, results[1], results[2]))

iter:0 - loss:1.423948 - accuracy:0.468750


At the end of the training, we can use the coordinator to stop the queue runners and join the threads.  
P.S. don't forget to close the *interactive* session!

In [10]:
coord.request_stop()
coord.join(threads)
sess.close()

We can put everything together and run the same process in a more common way, measuring the performance increment in terms of time spent:

In [11]:
STEPS = 100
CHECK_EVERY = 10

startTime = time.time()
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(STEPS):        
        fetches = [model.train_op]
        if i % CHECK_EVERY == 0:
            fetches = fetches + [model.loss_op, model.accuracy_op]
        results = sess.run(fetches)
        if len(results) > 1:
            print('iter:%d - loss:%f - accuracy:%f' % (i, results[1], results[2]))

    coord.request_stop()
    coord.join(threads)

    print("Time taken: %f" % (time.time() - startTime))

iter:0 - loss:1.423948 - accuracy:0.468750
iter:10 - loss:1.059269 - accuracy:0.546875
iter:20 - loss:1.995191 - accuracy:0.570312
iter:30 - loss:1.205318 - accuracy:0.460938
iter:40 - loss:1.159184 - accuracy:0.507812
iter:50 - loss:0.887080 - accuracy:0.609375
iter:60 - loss:0.444107 - accuracy:0.804688
iter:70 - loss:0.532430 - accuracy:0.718750
iter:80 - loss:0.485949 - accuracy:0.789062
iter:90 - loss:0.270457 - accuracy:0.859375
Time taken: 3.325673


## A proper input pipeline
To describe how to set up a proper input pipeline, we will simulate a more realistic scenario. Let's assume we are dealing with a large dataset persisted on 20 `.CSV` files, each one with 100 examples. We will start generating it with the following simple snippet.  
  
Each example is represented in a line with 1025 float numbers, separated by a `,`. The first 1024 elements represents the input feature, while the last one represent the gold truth label.

In [12]:
DATA_DIR = '/tmp/TF-102-05/data'
FILE_ROOT = 'examples-'
FILE_EXT = '.csv'
SEP = ','

def generate_data():
    os.makedirs(DATA_DIR)
    features = tf.random_normal([1024], mean=0.0, stddev=1.0)
    label = tf.cast(tf.reduce_sum(features) > 0, dtype=tf.float32)
    
    with tf.Session() as sess:
        for i in range(20):
            fname = os.path.join(DATA_DIR, FILE_ROOT + str(i).zfill(2) + FILE_EXT)
            with open(fname, 'w') as fio:
                for j in range(100):
                    f, l = sess.run([features, label])
                    line = SEP.join([str(item) for item in f] + [str(l)]) + '\n'
                    fio.write(line)
            
if not os.path.exists(DATA_DIR):
    generate_data()
    
_prepare_graph()

In [13]:
%%bash
cd /tmp/TF-102-05/data
ls | head -6
echo
ls | tail -5
echo
wc -l examples-00.csv

examples-00.csv
examples-01.csv
examples-02.csv
examples-03.csv
examples-04.csv
examples-05.csv

examples-15.csv
examples-16.csv
examples-17.csv
examples-18.csv
examples-19.csv

100 examples-00.csv


First, we create a queue reading from a set of file names, shuffling them. We can set the number of epochs so that the training stops after the training set has been submittet to the model such number of times.

In [14]:
NUM_EPOCHS = 2
PATTERN = DATA_DIR + '/' + FILE_ROOT + '*' + FILE_EXT
files = tf.train.match_filenames_once(PATTERN)
filename_queue = tf.train.string_input_producer(files, name='FilenameQueue', shuffle=True, num_epochs=NUM_EPOCHS)

This function already register a proper queue runner in the `QUEUE_RUNNERS` collection.  
**NOTA BENE**: this process will create some local variables that **MUST** be initialized when running the session via the `tf.local_variables_initialize()` op.

In [15]:
print('QueueRunners:')
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS):
    print(' ' + qr.name)
print

print('LocalVariables')
for var in tf.get_collection(tf.GraphKeys.LOCAL_VARIABLES):
    print('  ' + var.op.name)

QueueRunners:
 FilenameQueue

LocalVariables
  matching_filenames
  FilenameQueue/limit_epochs/epochs


We create a reader object that is fed with the queue and returns two tensors:
* a `key` tensor of dtype `tf.string` that identifies the particular element in the queue;
* a `value` tensor that holds the actual value of the example;
the `value` tensor must be fed into a decoder function that turns it into a set of tensors that can be fed into the model.  
  
Since we are dealing with `.CSV` files, we can use off-the-shelf components that are already in the TF library, namely the `tf.TextLineReader` and the `tf.decode_csv` function.

In [16]:
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
columns = tf.decode_csv(value, record_defaults=[[0.0]] * 1025)
inputs, targets = tf.stack(columns[:-1]), tf.cast(tf.expand_dims(input=columns[-1], dim=-1), tf.int32)

So, let's have a brief recap:
1. a set of filename is enqueued and fed into a reader;
2. the reader reads (a key and) a value, i.e. a tensor representing a single example;
3. such tensor is fed into a decoder that returns some other tensors (depending on the parsing).
  
Keep in mind: `filename queue -> reader -> value into the decoder.`

Now, we can build another queue to read pairs of `inputs` and `targets`, batch and shuffle them. We can use the `tf.train.shuffle_batch` function accepting some interesting parameters:
* `capacity`: An integer. The maximum number of elements in the queue.
* `min_after_dequeue`: Minimum number elements in the queue after a dequeue, used to ensure a level of mixing of elements.
* `num_threads`: The number of threads enqueuing the input tensors.
* `allow_smaller_final_batch`: (Optional) Boolean. If True, allow the final batch to be smaller if there are insufficient items left in the queue.  
  
The value of `capacity` must be larger than `min_after_dequeue` and the amount larger determines the maximum that will be prefetch. A rule of thumb is that:  
  
    min_after_dequeue + (num_threads + a_small_safety_margin) * batch_size

In [17]:
BATCH_SIZE = 23
MIN_AFTER_DEQUEUE = 10
NUM_THREADS = 2
CAPACITY = MIN_AFTER_DEQUEUE + (NUM_THREADS + 1) * BATCH_SIZE

input_batch, target_batch = tf.train.shuffle_batch(
    tensors=[inputs, targets],
    batch_size=BATCH_SIZE,
    num_threads=NUM_THREADS,
    capacity=CAPACITY,
    min_after_dequeue=MIN_AFTER_DEQUEUE,
    allow_smaller_final_batch=True)

Again, the `tf.train.shuffle_batch` will create some other queue runners for us:

In [18]:
print('QueueRunners:')
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS):
    print(' ' + qr.name)
print

QueueRunners:
 FilenameQueue
 shuffle_batch/random_shuffle_queue



Now we can build and train the model. Since we are iterating over the examples for a certain number of epochs, the queue will raise a `tf.errors.OutOfRangeError` that can be caught and trigger the end of the training process. Again, the `tf.train.Coordinator` class has everything we need:

In [19]:
model = Model(input_batch, target_batch)
with tf.Session() as sess:
    sess.run(tf.local_variables_initializer())  # again: keep in mind!
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    try:
        while not coord.should_stop():
            train_op, global_step, predictions = sess.run(
                [model.train_op, model.global_step, model.predictions])
            if global_step % 20 == 0:
                print('global_step: %d' % global_step)
    except tf.errors.OutOfRangeError:
        print('Done training, epoch limit reached.')
        print('last global step: %d' % global_step)
        print('last batch of size: %d' % len(predictions))
    finally:
        coord.request_stop()
    coord.join(threads)

global_step: 20
global_step: 40
global_step: 60
global_step: 80
global_step: 100
global_step: 120
global_step: 140
global_step: 160
Done training, epoch limit reached.
last global step: 174
last batch of size: 21
