# Tensorflow input pipeline: large datasets and data augmentation

## Introduction

The new high level Dataset API makes it quite easy to deal with large datasets. No need to bother with queues anymore.

This notebook gives a short example of how to use the Dataset API for:

1. loading multiple files for each input example,
2. data augmentation.

Loading multiple files for each input example can be needed for several applications:

* dealing with image sequences,
* object detection with multiple cameras,
* etc.

Data augmentation is a classical problem in deep learning. However, the Dataset API documentation does not give any hint about how to achieve data augmentation. I am not sure that the method used below is the best for this task. If you find a better way, don't hesitate to tell me!

__Note on tensorflow version:__

> In tensorflow v1.4, *tf.contrib.data* has been integrated into *tf.data*. Please use appropriate import according to your tensorflow version. See comment in code below.

## Let's go!

In [1]:
import tensorflow as tf
import numpy as np

# For tensorflow 1.3
#from tensorflow.contrib import data as tfdata

# For tensorflow 1.4 and above
from tensorflow import data as tfdata

### Create some dummy data

We are going to create some dummy data files. In a real application, use your own data files. Here, we will create 4 files: two groups of two files. Each file will contain a 3x3 matrix.

In [2]:
height = 3
width = 3

# Create random files containing raw matrix of shape 3x3
for i in range(2):
    for j in range(2):
        # Create matrix with fake data
        matrix = np.zeros((height,width)) + i + (j*10)
        # Save matrix as raw float32 file
        matrix.astype('float32').tofile('data/file_' + str(i) + '_' + str(j) + '.raw')

Let's have a look at one of our dummy data files.

In [3]:
# Print one of the generated files for checking
matrix = np.fromfile('data/file_1_1.raw', dtype=np.float32)
matrix = matrix.reshape((height,width))
print(matrix)

[[ 11.  11.  11.]
 [ 11.  11.  11.]
 [ 11.  11.  11.]]


OK, our dummy data file looks good.

### Create parser

We will need a parser to read our examples from the data files.

Here, each example will be created by stacking data coming from two different files and a label. Therefore, the parser arguments will be the two filenames and the label. It will return two tensors containing the example and the label.

In [4]:
# Create parser
# Args: filenames
# Returns: tensor containing read and decoded element
def _parse_data(filename0, filename1, label):
    # Read and decode first file
    matrix0 = tf.read_file(filename0)
    matrix0 = tf.decode_raw(matrix0, out_type=tf.float32)
    matrix0 = tf.reshape(matrix0, [height,width])
    
    # Read and decode second file
    matrix1 = tf.read_file(filename1)
    matrix1 = tf.decode_raw(matrix1, out_type=tf.float32)
    matrix1 = tf.reshape(matrix1, [height,width])
    
    # Stack the two elements together
    X = tf.stack([matrix0, matrix1])
    
    # Get label (you could implement more complex logic here if needed)
    y = tf.reshape(label, [1])
    
    return X, y

### Create data augmentation function

We will need a function to implement the data augmentation logic.

This function takes one example (X,y), and returns a dataset with two examples: the original example, and a second one generated on the fly.

In [5]:
# Data augmentation function: create several examples from one example
# Args: One example X,y
# Returns: Dataset containing several examples, after data augmentation
def _data_augment(X,y):
    # Generate new data from example X
    X0 = X
    X1 = -X # Dummy data augmentation, but we could use any transformation we need. We could also generate more examples.
    X_augmented = tf.stack([X0,X1])
    
    # Repeat y
    y_augmented = tf.stack([y,y])
    
    dataset = tfdata.Dataset.from_tensor_slices((X_augmented, y_augmented))
        
    return dataset

### It's almost done

Now that we have implemented our parsing and data augmentation logic, we can create the dataset.

As you can see, with the TensorFlow Dataset API, it is really easy! 

In [6]:
####################################################################
# Create dataset from files
####################################################################

# Get the filenames lists
filenames0 = ['data/file_0_0.raw', 'data/file_0_1.raw']
filenames1 = ['data/file_1_0.raw', 'data/file_1_1.raw']

# Create tensorflow constant containing the filenames
tf_filenames0 = tf.constant(filenames0)
tf_filenames1 = tf.constant(filenames1)

# Create labels dataset
labels = tf.constant([15, 25])

# Create dataset containing filenames and labels
dataset = tfdata.Dataset.from_tensor_slices((tf_filenames0, tf_filenames1, labels))

# Use our _parse_data function to read files and decode data
dataset = dataset.map(_parse_data)

#####################################################################
# Data augmentation
#####################################################################

# Apply data augmentation
dataset = dataset.interleave(_data_augment, cycle_length=1)

### Print the dataset

We can now print the dataset to check that everything is working.

In [7]:
#####################################################################
# Print the dataset
#####################################################################

# create TensorFlow Iterator object
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:

    # iterate over the dataset
    while True:
        try:
            elem = sess.run(next_element)
            print(elem)
        except tf.errors.OutOfRangeError:
            print("End of dataset")
            break

(array([[[ 0.,  0.,  0.],
        [ 0.,  0.,  0.],
        [ 0.,  0.,  0.]],

       [[ 1.,  1.,  1.],
        [ 1.,  1.,  1.],
        [ 1.,  1.,  1.]]], dtype=float32), array([15]))
(array([[[-0., -0., -0.],
        [-0., -0., -0.],
        [-0., -0., -0.]],

       [[-1., -1., -1.],
        [-1., -1., -1.],
        [-1., -1., -1.]]], dtype=float32), array([15]))
(array([[[ 10.,  10.,  10.],
        [ 10.,  10.,  10.],
        [ 10.,  10.,  10.]],

       [[ 11.,  11.,  11.],
        [ 11.,  11.,  11.],
        [ 11.,  11.,  11.]]], dtype=float32), array([25]))
(array([[[-10., -10., -10.],
        [-10., -10., -10.],
        [-10., -10., -10.]],

       [[-11., -11., -11.],
        [-11., -11., -11.],
        [-11., -11., -11.]]], dtype=float32), array([25]))
End of dataset


### It is working!

We can see that parsing and data augmentation work well:

* each example contains:
  * two matrices coming from two different files
  * a label
* after each example, there is an augmented example (-X, y)

## Let's train our model now

First we have to create a model.

In [20]:
# Create a small dummy model
def model_function(input_data):
    flattened = tf.contrib.layers.flatten(input_data)
    output = tf.layers.dense(flattened, 1)
    return output

# Create a loss function    
def loss_function(example, label):
    output = model_function(example)
    return tf.losses.mean_squared_error(label, output)

t_input_data = tf.placeholder(tf.float32, shape=[None, 2, 3, 3])
t_label = tf.placeholder(tf.float32, shape=[None, 1])
t_output = model_function(t_input_data)
t_loss = tf.losses.mean_squared_error(t_label, t_output)

Now we will do the following:

* Prepare the dataset for training
  * Repeat the dataset for number of epochs
  * Set the batch size
  * Create an iterator on the batched dataset
  * Compute the number of steps per epoch
* Create a global_step variable. This variable will count the number of training steps.
* Create the loss op and the train op.

In [9]:
# Repeat dataset for number of epochs
number_of_epochs = 30
batch_dataset = dataset.repeat(number_of_epochs)

# Set batch size
batch_dataset = batch_dataset.batch(2)

# create TensorFlow Iterator object
iterator = batch_dataset.make_one_shot_iterator()

# Get example, compute loss and create optimizer
next_batch_examples, next_batch_labels = iterator.get_next()

# Create global_step variable, to store the global training step
global_step = tf.Variable(0, name='global_step', trainable=False)

loss = loss_function(next_batch_examples, next_batch_labels)
train_op = tf.train.GradientDescentOptimizer(1e-3).minimize(loss, global_step=global_step)

### That's all. We can train!

We will train the model by using *tf.train.MonitoredTrainingSession* instead of *tf.Session*. The MonitoredTrainingSession has builtin hooks that automate some common tasks:

* Save the model periodically during training,
* Restore previous training before starting a new training session,
* Create logs for tensorboard.

In [10]:
# Train

# MonitoredTrainingSession example
with tf.train.MonitoredTrainingSession(checkpoint_dir='./saved_training',
                                       save_checkpoint_secs=10,
                                       save_summaries_steps=2) as sess:
    while not sess.should_stop():
        sess.run(train_op)
    

INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Restoring parameters from ./saved_training\model.ckpt-168
INFO:tensorflow:Saving checkpoints for 169 into ./saved_training\model.ckpt.
INFO:tensorflow:Saving checkpoints for 228 into ./saved_training\model.ckpt.


## Validation cost during training

Now, let's see how we can enhance the previous code to compute validation loss periodically during training.

### Create validation dataset

I will not comment this part. It is just about creating a fake validation dataset. You should replace this by creating your own real validation dataset.

In [14]:
# Create random files containing raw matrix of shape 3x3
for i in range(2):
    for j in range(2):
        # Create matrix with fake data
        matrix = np.zeros((height,width)) + i + (j*10) + 2
        # Save matrix as raw float32 file
        matrix.astype('float32').tofile('data/validation_' + str(i) + '_' + str(j) + '.raw')
        
# Get the filenames lists
val_filenames0 = ['data/validation_0_0.raw', 'data/validation_0_1.raw']
val_filenames1 = ['data/validation_1_0.raw', 'data/validation_1_1.raw']

# Create tensorflow constant containing the filenames
tf_val_filenames0 = tf.constant(val_filenames0)
tf_val_filenames1 = tf.constant(val_filenames1)

# Create labels dataset
val_labels = tf.constant([19, 21])

# Create dataset containing filenames and labels
val_dataset = tfdata.Dataset.from_tensor_slices((tf_filenames0, tf_filenames1, labels))

# Use our _parse_data function to read files and decode data
val_dataset = val_dataset.map(_parse_data)

# Set batch size
val_dataset = val_dataset.batch(2)

# create TensorFlow Iterator object
val_iterator = val_dataset.make_one_shot_iterator()

# Get example, compute loss and create optimizer
next_val_examples, next_val_labels = iterator.get_next()

# Create validation loss op
val_loss = loss_function(next_val_examples, next_val_labels)

In [16]:
def run_validation(op, sess):  # Called inside train_loop()
    # iterate over the dataset
    while True:
        try:
            loss = sess.run(op)
            print(loss)
        except tf.errors.OutOfRangeError:
            print("End of dataset")
            break
    
    return 0

In [18]:
# Train

validate_every_n_steps = 10

# MonitoredTrainingSession example
with tf.train.MonitoredTrainingSession(checkpoint_dir='./saved_training',
                                       save_checkpoint_secs=10,
                                       save_summaries_steps=2) as sess:
    while not sess.should_stop():
        _, step = sess.run([train_op, global_step])
        # Check if it is time to compute validation loss
        if step % validate_every_n_steps == 0:
            # Compute validation loss
            run_validation(val_loss, sess)
            

INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Restoring parameters from ./saved_training\model.ckpt-288
INFO:tensorflow:Saving checkpoints for 289 into ./saved_training\model.ckpt.


FailedPreconditionError: Attempting to use uninitialized value dense_3/bias
	 [[Node: dense_3/bias/read = Identity[T=DT_FLOAT, _class=["loc:@dense_3/bias"], _device="/job:localhost/replica:0/task:0/device:CPU:0"](dense_3/bias)]]

Caused by op 'dense_3/bias/read', defined at:
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\traitlets\config\application.py", line 658, in launch_instance
    app.start()
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\ipykernel\kernelapp.py", line 477, in start
    ioloop.IOLoop.instance().start()
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\zmq\eventloop\ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tornado\ioloop.py", line 888, in start
    handler_func(fd_obj, events)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tornado\stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\zmq\eventloop\zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\zmq\eventloop\zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\zmq\eventloop\zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tornado\stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\ipykernel\kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\ipykernel\kernelbase.py", line 235, in dispatch_shell
    handler(stream, idents, msg)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\ipykernel\kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\ipykernel\ipkernel.py", line 196, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\ipykernel\zmqshell.py", line 533, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\IPython\core\interactiveshell.py", line 2683, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\IPython\core\interactiveshell.py", line 2787, in run_ast_nodes
    if self.run_code(code, result):
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\IPython\core\interactiveshell.py", line 2847, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-14-13748a28053d>", line 38, in <module>
    val_loss = loss_function(next_val_examples, next_val_labels)
  File "<ipython-input-8-15ff9ffa13e5>", line 9, in loss_function
    output = model_function(example)
  File "<ipython-input-8-15ff9ffa13e5>", line 4, in model_function
    output = tf.layers.dense(flattened, 1)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\layers\core.py", line 250, in dense
    return layer.apply(inputs)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\layers\base.py", line 671, in apply
    return self.__call__(inputs, *args, **kwargs)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\layers\base.py", line 559, in __call__
    self.build(input_shapes[0])
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\layers\core.py", line 145, in build
    trainable=True)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\layers\base.py", line 458, in add_variable
    trainable=trainable and self.trainable)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\variable_scope.py", line 1203, in get_variable
    constraint=constraint)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\variable_scope.py", line 1092, in get_variable
    constraint=constraint)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\variable_scope.py", line 425, in get_variable
    constraint=constraint)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\variable_scope.py", line 394, in _true_getter
    use_resource=use_resource, constraint=constraint)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\variable_scope.py", line 805, in _get_single_variable
    constraint=constraint)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\variables.py", line 213, in __init__
    constraint=constraint)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\variables.py", line 356, in _init_from_args
    self._snapshot = array_ops.identity(self._variable, name="read")
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\array_ops.py", line 125, in identity
    return gen_array_ops.identity(input, name=name)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\ops\gen_array_ops.py", line 2070, in identity
    "Identity", input=input, name=name)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\framework\op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\framework\ops.py", line 2956, in create_op
    op_def=op_def)
  File "c:\users\perezl\appdata\local\programs\python\python36\lib\site-packages\tensorflow\python\framework\ops.py", line 1470, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

FailedPreconditionError (see above for traceback): Attempting to use uninitialized value dense_3/bias
	 [[Node: dense_3/bias/read = Identity[T=DT_FLOAT, _class=["loc:@dense_3/bias"], _device="/job:localhost/replica:0/task:0/device:CPU:0"](dense_3/bias)]]
