# Designing input pipelines in TensorFlow using queues

A bottleneck for many machine learning applications is the input pipeline, which means loading inputs (e.g, images) from disk and transport the data to the device (e.g., the GPU). A very important feature of TensorFlow is the capability to design highly efficient solutions for this problem so that the throughput is only limited by the computing power of your device.

In TensorFlow, the solution is normally solved by using queues, which are similar to buffers in memory being filled concurrently in multiple threads so that data for the execution of the graph, e.g. for training, is always available.

In [1]:
import numpy as np
np.random.seed(1234)
import tensorflow as tf

  from ._conv import register_converters as _register_converters


## Toy scenario

For this example, we are simply writing random numbers to a CSV file, which will be read afterwards in a queue using multiple threads.

In [2]:
f = open("data.csv", "w")
for i in range(10000):
    f.write("{}, {}\n".format(np.random.randn(), np.random.randn()))
f.close()

## The TensorFlow readers

Before filling data to a queue, the data has to be read and decoded, which is performed using readers. Multiple readers are implemented in TensorFlow to be used out-of-the-box.

The input pipeline usually starts with a special queue, the filename queue. This queue holds only the names of the files, which shall be loaded in the queue. Then, a reader instance is created based on this information.

In [3]:
filename_queue = tf.train.string_input_producer(["data.csv"])
reader = tf.TextLineReader()
_, line = reader.read(filename_queue)
value_1, value_2 = tf.decode_csv(line, record_defaults=[[1.0], [1.0]])
values = tf.stack([value_1, value_2])

## The queue

This example uses a simple first-in-first-out queue. Each queue has always an `enqueue` and a `dequeue` method, which will be used in the following to fill and return values from the queue.

In a realistic example, more often a `RandomShuffleQueue` is used, which can be used to return randomized batches of inputs such as used in neural network training.

In [4]:
fifo_queue = tf.FIFOQueue(capacity=100, dtypes=[tf.float32], shapes=[[2]])
enqueue = fifo_queue.enqueue(values)

## Filling the queue in the background

So that data is always available in memory, we spawn multiple threads loading data from the defined inputs to the queue concurrently to other executions of the graph. In TensorFlow, you have to add `QueueRunners`, which perform this task.

In [5]:
num_threads = 4
queue_runner = tf.train.QueueRunner(fifo_queue, [enqueue] * num_threads)
tf.train.add_queue_runner(queue_runner)

## Returning batches from the queue

To use data from the queue, simply call the `dequeue` or `dequeue_many` method of the queue. The resulting object is an operation of the graph, which can be fed to any other computation, e.g., it can be used as input node of a neural network.

In [6]:
batch_size = 5
batch = fifo_queue.dequeue_many(batch_size)

## Executing the input pipeline

As example, we retrieve some mini-batches of the queue. To do so, we need to create a `Coordinator`, which can be used to organize the `QueueRunners`. Finally, `start_queue_runners` starts the background threads, which now run independently of other `sess.run(...)` exe

In [7]:
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(3):
        print("Mini-batch: {}".format(i+1))
        print(sess.run(batch))
    coord.request_stop()
    coord.join(threads)

Mini-batch: 1
[[ 0.47143516 -1.1909757 ]
 [ 1.432707   -0.3126519 ]
 [-0.72058874  0.8871629 ]
 [ 0.8595884  -0.6365235 ]
 [ 0.01569637 -2.2426848 ]]
Mini-batch: 2
[[ 1.1500357   0.99194604]
 [ 0.95332414 -2.0212548 ]
 [ 0.4054534   0.28909194]
 [-0.33407736  0.00211836]
 [ 1.3211582  -1.5469055 ]]
Mini-batch: 3
[[-0.20264633 -0.6559693 ]
 [ 0.19342138  0.5534389 ]
 [ 1.3181516  -0.46930528]
 [ 0.6755541  -1.8170272 ]
 [-0.18310854  1.0589691 ]]
