### This notebook shows an example of input pipeline in tensorflow with CIFAR-10

In [1]:
import os
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt

#### Steps involved

![Image](http://adventuresinmachinelearning.com/wp-content/uploads/2017/08/AnimatedFileQueues.gif)

    1) Read the file names and create a list
    
    2) Create a Queue to hold and randomly shuffle the filename list
    
    3) Dequeue files and extract images
    
    4) Perform image processing
    
    5) Enqueue new image to a new queue (random shuffle in this case)
        
    6) Dequeue in batches for training

In [2]:
data_path = './data/cifar/cifar-10-batches-bin/'
batch_size = 128
num_threads = 16

##### Step 1 : Read the file names and create a list

In [3]:
# Create a list of all filenames
filename_list = [data_path + 'data_batch_{}.bin'.format(i + 1) for i in range(5)]

##### Step 2 : Create a Queue to hold and randomly shuffle the filename list

In [4]:
fq = tf.FIFOQueue(capacity = 10, dtypes=tf.string)

# Queue holds tensor strings, hence converting filename_list to tensor
string_tensor = tf.convert_to_tensor(filename_list, dtype=tf.string)
tf.random_shuffle(string_tensor)  # shuffling just for fun

fq_enqueue_op = fq.enqueue_many([string_tensor])

# Create queue runner for file queue
fq_runner = tf.train.QueueRunner(fq, [fq_enqueue_op] * 1)
tf.train.add_queue_runner(fq_runner)

##### Step 3 : Dequeue files and extract images

Fixed Length Record Reader, a class provided by tensorflow, handles the dequeuing of the files and reads the data. It returns the processed image and label, with shapes ready for cnn

In [5]:
# parameters required to read a record from data_batch_1 file
label_bytes = 1 # 2 for CIFAR-100
height = 32
width = 32
depth = 3
image_bytes = height * width * depth  # All these info is avail  # See http://www.cs.toronto.edu/~kriz/cifar.html
                                     # for a description of the input format
    
# Every record consists of a label followed by the image, with a fixed number of bytes
record_bytes = image_bytes + label_bytes

# Read a record, getting filenames from the filename_queue. Since no header/footer, corresponding options are left default, 0
reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
key, value = reader.read(fq)
record_bytes = tf.decode_raw(value, tf.uint8)

# The first byte represent the label, which we convert from uint8 -> int32
#tf.strided_slice(record_bytes, [0], [label_bytes])
label = tf.cast(tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)

# The remaining bytes are reshaped from [depth * height * width]  to [depth, height, width]
depth_major = tf.reshape(tf.strided_slice(record_bytes, [label_bytes], [label_bytes + image_bytes]), 
                        [depth, height, width])

# Convert from [depth, height, width] to [height, width, depth]
img_temp = tf.transpose(depth_major, [1, 2, 0])
reshaped_img = tf.cast(img_temp, tf.float32)

c_height = 24
c_width = 24

# Image processing for eval
# Crop the central [height, width] of the image
resized_img = tf.image.resize_image_with_crop_or_pad(reshaped_img, c_height, c_width)

resized_img.set_shape([c_height, c_width, 3])
label.set_shape([1])

In [7]:
# Adding <img, label> pair to random shuffle queue
tensor_list = [resized_img, label]
dtypes = [tf.float32, tf.int32]
shape = [resized_img.get_shape(), label.get_shape()]

min_after_dequeue = 10000
capacity = min_after_dequeue + (num_threads + 1) * batch_size

q = tf.RandomShuffleQueue(capacity=capacity, min_after_dequeue=min_after_dequeue, dtypes=dtypes, shapes=shape)

enqueue_op = q.enqueue(tensor_list)
# add to the queue runner collection
tf.train.add_queue_runner(tf.train.QueueRunner(q, [enqueue_op] * num_threads))

image_batch, label_batch = q.dequeue_many(batch_size)

In [8]:
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    
    
    image, lab= sess.run([image_batch, label_batch])
    print(image.shape, lab.shape)
    
    
    coord.request_stop()
    coord.join(threads)

((128, 24, 24, 3), (128, 1))


Tensorflow helps to replicate<br> `FIFOQueue`   ::  `string_input_producer` <br>
`RandomShuffleQueue` :: `shuffle_batch` <br>
<p>
    `string_input_producer` takes a list of filenames and creates a FIFOQueue with enqueuing implicitely provided.<br>
    `shuffle_batch` creates a RandomShuffleQueue with enqueuing and batch-sized dequeuing already provided.


In [9]:
filename_list = [data_path + 'data_batch_{}.bin'.format(i + 1) for i in range(5)]

In [10]:
file_q = tf.train.string_input_producer(filename_list)

In [11]:
# parameters required to read a record from data_batch_1 file
label_bytes = 1 # 2 for CIFAR-100
height = 32
width = 32
depth = 3
image_bytes = height * width * depth  # All these info is avail  # See http://www.cs.toronto.edu/~kriz/cifar.html
                                     # for a description of the input format
    
# Every record consists of a label followed by the image, with a fixed number of bytes
record_bytes = image_bytes + label_bytes

# Read a record, getting filenames from the filename_queue. Since no header/footer, corresponding options are left default, 0
reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
key, value = reader.read(fq)
record_bytes = tf.decode_raw(value, tf.uint8)

# The first byte represent the label, which we convert from uint8 -> int32
#tf.strided_slice(record_bytes, [0], [label_bytes])
label = tf.cast(tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)

# The remaining bytes are reshaped from [depth * height * width]  to [depth, height, width]
depth_major = tf.reshape(tf.strided_slice(record_bytes, [label_bytes], [label_bytes + image_bytes]), 
                        [depth, height, width])

# Convert from [depth, height, width] to [height, width, depth]
img_temp = tf.transpose(depth_major, [1, 2, 0])
reshaped_img = tf.cast(img_temp, tf.float32)

c_height = 24
c_width = 24

# Image processing for eval
# Crop the central [height, width] of the image
resized_img = tf.image.resize_image_with_crop_or_pad(reshaped_img, c_height, c_width)

resized_img.set_shape([c_height, c_width, 3])
label.set_shape([1])

In [12]:
min_after_dequeue = 10000
capacity = min_after_dequeue + (num_threads + 1) * batch_size

In [13]:
image_b, label_b = tf.train.shuffle_batch([image, label],
                                          batch_size = batch_size,
                                          capacity = capacity,
                                          min_after_dequeue = min_after_dequeue,
                                          num_threads=num_threads)

In [14]:
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    
    for i in range(10):
        image, lab= sess.run([image_batch, label_batch])
        print(image.shape, lab.shape)
    
    
    coord.request_stop()
    coord.join(threads)

((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
((128, 24, 24, 3), (128, 1))
