### Input Pipeline
1. The list of filenames
2. Optional filename shuffling
3. Optional epoch limit
4. Filename queue
5. A Reader for the file format
6. A decoder for a record read by the reader
7. Optional preprocessing
8. Example queue

In [1]:
import os
import sys
from six.moves import urllib
import tarfile
import tensorflow as tf
import numpy as np

  return f(*args, **kwds)


### Loading Data Manually  
To know how it works under the hood, let's load CIFAR-10 by our own (not using keras). According the descripion, the dataset file is divided into five training batches and one test batch, each with 10000 images. The test batch contains exactly 1000 randomly-selected images from each class. We define some constants based on the above:

In [2]:
# the url to download CIFAR-10 dataset (binary version)
# see format and details here: http://www.cs.toronto.edu/~kriz/cifar.html
DATA_URL = 'http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz'
DEST_DIRECTORY = 'dataset/cifar10'

# the image size we want to keep
IMAGE_HEIGHT = 32
IMAGE_WIDTH = 32
IMAGE_DEPTH = 3
IMAGE_SIZE_CROPPED = 24
BATCH_SIZE = 128

# Global constants describing the CIFAR-10 data set.
NUM_CLASSES = 10 
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000

In [4]:
def maybe_download_and_extract(dest_directory, url):
    if not os.path.exists(dest_directory):
        os.makedirs(dest_directory)
    file_name = 'cifar-10-binary.tar.gz'
    file_path = os.path.join(dest_directory, file_name)
    # if have not downloaded yet
    if not os.path.exists(file_path):
        def _progress(count, block_size, total_size):
            sys.stdout.write('\r%.1f%%' % 
                             (float(count * block_size) / float(total_size) * 100.0))
            sys.stdout.flush()  # flush the buffer

        print('>> Downloading %s ...' % file_name)
        file_path, _ = urllib.request.urlretrieve(url, file_path, _progress)
        file_size = os.stat(file_path).st_size
        print('\r>> Total %d bytes' % file_size)
    extracted_dir_path = os.path.join(dest_directory, 'cifar-10-batches-bin')
    if not os.path.exists(extracted_dir_path):
        # Open for reading with gzip compression, then extract all
        tarfile.open(file_path, 'r:gz').extractall(dest_directory)
    print('>> Done')

# download it
maybe_download_and_extract(DEST_DIRECTORY, DATA_URL)

>> Downloading cifar-10-binary.tar.gz ...
>> Total 170052171 bytes
>> Done


#### After downloading the dataset, we create functions
1. distort_input(training_file, batch_size) to get a training example queue.
2. eval_input(testing_file, batch_size) to get a testing example queue.
3. read_cifar10(filename_queue) to read a record from dataset with a filename queue.

In [None]:
# the folder store the dataset
DATA_DIRECTORY = DEST_DIRECTORY + '/cifar-10-batches-bin'
# (1) a list of training/testing filenames
training_files = [os.path.join(DATA_DIRECTORY, 'data_batch_%d.bin' % i) for i in range(1,6)]
testing_files = [os.path.join(DATA_DIRECTORY, 'test_batch.bin')]

In [None]:
# (5) + (6)
def read_cifar10(filename_queue):
    """ Reads and parses examples from CIFAR10 data files.
        -----
        Args:
            filename_queue: 
                A queue of strings with the filenames to read from.
        Returns:
            An object representing a single example, with the following fields:
            height: 
                number of rows in the result (32)
            width: 
                number of columns in the result (32)
            depth: 
                number of color channels in the result (3)
            key: 
                a scalar string Tensor describing the filename & record number for this example.
            label: 
                an int32 Tensor with the label in the range 0..9.
            image: 
                a [height, width, depth] uint8 Tensor with the image data
      """

    class CIFAR10Record(object):
        pass

    result = CIFAR10Record()
    # CIFAR10 consists of 60000 32x32 'color' images in 10 classes
    label_bytes = 1  # 10 class
    result.height = IMAGE_HEIGHT
    result.width = IMAGE_WIDTH
    result.depth = IMAGE_DEPTH
    image_bytes = result.height * result.width * result.depth

    # bytes of a record: label(1 byte) followed by pixels(3072 bytes)
    record_bytes = label_bytes + image_bytes
    
    # (5) reader for cifar10 file format
    reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
    # read a record
    result.key, record_string = reader.read(filename_queue)

    # Convert from a string to a vector of uint8 that is record_bytes long.
    # (6) decoder
    record_uint8 = tf.decode_raw(record_string, tf.uint8)
    # get the label and cast it to int32
    result.label = tf.cast(
        tf.strided_slice(record_uint8, [0], [label_bytes]), tf.int32)
    # [depth, height, width], uint8
    depth_major = tf.reshape(
        tf.strided_slice(record_uint8, [label_bytes],
                         [label_bytes + image_bytes]),
        [result.depth, result.height, result.width])
    # change to [height, width, depth], uint8
    result.image = tf.transpose(depth_major, [1, 2, 0])
    return result

In [None]:
def distort_input(training_files, batch_size):
    """ Construct distorted input for CIFAR training using the Reader ops.
        -----
        Args:
            training_files: 
                an array of paths of the training files.
            batch_size: 
                Number of images per batch.
        Returns:
            images: Images. 
                4D tensor of [batch_size, IMAGE_SIZE, IMAGE_SIZE, 3] size.
            labels: Labels. 
                1D tensor of [batch_size] size.
    """
    for f in training_files:
        if not tf.gfile.Exists(f):
            raise ValueError('Failed to find file: ' + f)
    # create a queue that produces filenames to read
    # (4) filename queue
    file_queue = tf.train.string_input_producer(training_files)
    
    # (5) + (6)
    cifar10_record = read_cifar10(file_queue)
    
    # (7) image preprocessing for training
    height = IMAGE_SIZE_CROPPED
    width = IMAGE_SIZE_CROPPED
    float_image = tf.cast(cifar10_record.image, tf.float32)
    distorted_image = tf.random_crop(float_image, [height, width, 3])
    distorted_image = tf.image.random_flip_left_right(distorted_image)
    distorted_image = tf.image.random_brightness(distorted_image, max_delta=63)
    distorted_image = tf.image.random_contrast(distorted_image, lower=0.2, upper=1.8)

    # standardization: subtract off the mean and divide by the variance of the pixels
    distorted_image = tf.image.per_image_standardization(distorted_image)

    # Set the shapes of tensors.
    distorted_image.set_shape([height, width, 3])
    cifar10_record.label.set_shape([1])

    # ensure a level of mixing of elements.
    min_fraction_of_examples_in_queue = 0.4
    min_queue_examples = int(
        NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN * min_fraction_of_examples_in_queue)

    # (8) example queue
    # Filling queue with min_queue_examples CIFAR images before starting to train
    image_batch, label_batch = tf.train.shuffle_batch(
        [distorted_image, cifar10_record.label],
        batch_size=batch_size,
        num_threads=16,
        capacity=min_queue_examples + 3 * batch_size,
        min_after_dequeue=min_queue_examples)
    return image_batch, tf.reshape(label_batch, [batch_size])

In [None]:
def eval_input(testing_files, batch_size):
    for f in testing_files:
        if not tf.gfile.Exists(f):
            raise ValueError('Failed to find file: ' + f)
    # create a queue that produces filenames to read
    file_queue = tf.train.string_input_producer(testing_files)
    cifar10_record = read_cifar10(file_queue)
    
    # image preprocessing for training
    height = IMAGE_SIZE_CROPPED
    width = IMAGE_SIZE_CROPPED
    float_image = tf.cast(cifar10_record.image, tf.float32)
    resized_image = tf.image.resize_image_with_crop_or_pad(float_image, height, width)
    image_eval = tf.image.per_image_standardization(resized_image)
    image_eval.set_shape([height, width, 3])
    cifar10_record.label.set_shape([1])
    
    # Ensure that the random shuffling has good mixing properties.
    min_fraction_of_examples_in_queue = 0.4
    min_queue_examples = int(
        NUM_EXAMPLES_PER_EPOCH_FOR_EVAL * min_fraction_of_examples_in_queue)
    image_batch, label_batch = tf.train.batch(
        [image_eval, cifar10_record.label],
        batch_size=batch_size,
        num_threads=16,
        capacity=min_queue_examples + 3 * batch_size)
    return image_batch, tf.reshape(label_batch, [batch_size])

In [None]:
# test function distort_input
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    image, label = distort_input(training_files, BATCH_SIZE)
    # --- Note ---
    # If you forget to call start_queue_runners(), it will hang
    # indefinitely and deadlock the user program.
    # ------------
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    image_batch, label_batch = sess.run([image, label])
    coord.request_stop()
    coord.join(threads)
    image_batch_np = np.asarray(image_batch)
    label_batch_np = np.asarray(label_batch)
    print('Shape of cropped image:', image.shape)
    print('Shape of label:', label.shape)

### CNN Model

In [None]:
class CNN_Model(object):
    def __init__(self, batch_size, num_classes, num_training_example,
                 num_epoch_per_decay, init_lr, moving_average_decay):
        self.batch_size = batch_size
        self.num_classes = num_classes
        self.num_training_example = num_training_example
        self.num_epoch_per_decay = num_epoch_per_decay
        self.init_lr = init_lr  # initial learn rate
        self.moving_average_decay = moving_average_decay

    def _variable_on_cpu(self, name, shape, initializer):
        with tf.device('/cpu:0'):
            var = tf.get_variable(
                name, shape, initializer=initializer, dtype=tf.float32)
        return var

    def _variable_with_weight_decay(self, name, shape, stddev, wd=0.0):
        """ Helper to create an initialized Variable with weight decay.
            Note that the Variable is initialized with a truncated normal 
            distribution. A weight decay is added only if one is specified.
            -----
            Args:
                name: 
                    name of the variable
                shape: 
                    a list of ints
                stddev: 
                    standard deviation of a truncated Gaussian
                wd: 
                    add L2Loss weight decay multiplied by this float. If None, weight
                    decay is not added for this Variable.
            Returns:
                Variable Tensor
        """
        initializer = tf.truncated_normal_initializer(
            stddev=stddev, dtype=tf.float32)
        var = self._variable_on_cpu(name, shape, initializer)
        # deal with weight decay
        weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        return var

    def inference(self, images):
        """ build the model
            -----
            Args:
                images with shape [batch_size,24,24,3]
            Return:
                logits with shape [batch_size,10]
        """
        with tf.variable_scope('conv_1') as scope:
            kernel = self._variable_with_weight_decay(
                'weights', [5, 5, 3, 64], 5e-2)
            conv = tf.nn.conv2d(images, kernel, strides=[1, 1, 1, 1], padding="SAME")
            biases = self._variable_on_cpu('bias', [64], tf.constant_initializer(0.0))
            pre_activation = tf.nn.bias_add(conv, biases)
            conv_1 = tf.nn.relu(pre_activation, name=scope.name)
        # pool_1
        pool_1 = tf.nn.max_pool(
            conv_1,
            ksize=[1, 3, 3, 1],
            strides=[1, 2, 2, 1],
            padding='SAME',
            name='pool_1')
        # norm_1 (local_response_normalization)
        norm_1 = tf.nn.lrn(
            pool_1, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm_1')
        # conv2
        with tf.variable_scope('conv_2') as scope:
            kernel = self._variable_with_weight_decay(
                'weights', [5, 5, 64, 64], 5e-2)
            conv = tf.nn.conv2d(norm_1, kernel, [1, 1, 1, 1], padding='SAME')
            biases = self._variable_on_cpu('biases', [64],
                                           tf.constant_initializer(0.1))
            pre_activation = tf.nn.bias_add(conv, biases)
            conv_2 = tf.nn.relu(pre_activation, name=scope.name)
        # norm2
        norm_2 = tf.nn.lrn(
            conv_2, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm_2')
        # pool2
        pool_2 = tf.nn.max_pool(
            norm_2,
            ksize=[1, 3, 3, 1],
            strides=[1, 2, 2, 1],
            padding='SAME',
            name='pool_2')
        # FC_1 (fully-connected layer)
        with tf.variable_scope('FC_1') as scope:
            flat_features = tf.reshape(pool_2, [self.batch_size, -1])
            dim = flat_features.get_shape()[1].value
            weights = self._variable_with_weight_decay(
                'weights', [dim, 384], 0.04, 0.004)
            biases = self._variable_on_cpu('biases', [384],
                                           tf.constant_initializer(0.1))
        FC_1 = tf.nn.relu(
            tf.matmul(flat_features, weights) + biases, name=scope.name)
        # FC_2
        with tf.variable_scope('FC_2') as scope:
            weights = self._variable_with_weight_decay(
                'weights', [384, 192], 0.04, 0.004)
            biases = self._variable_on_cpu('biases', [192],
                                           tf.constant_initializer(0.1))
            FC_2 = tf.nn.relu(tf.matmul(FC_1, weights) + biases, name=scope.name)
        # softmax_linear
        with tf.variable_scope('softmax_linear') as scope:
            weights = self._variable_with_weight_decay(
                'weights', [192, self.num_classes], 1 / 192.0)
            biases = self._variable_on_cpu('biases', [self.num_classes],
                                           tf.constant_initializer(0.0))
            logits = tf.add(tf.matmul(FC_2, weights), biases, name=scope.name)
        return logits

    def loss(self, logits, labels):
        '''calculate the loss'''
        labels = tf.cast(labels, tf.int64)
        cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
            labels=labels, logits=logits, name='cross_entropy_per_example')
        cross_entropy_mean = tf.reduce_mean(cross_entropy, name='cross_entropy')
        tf.add_to_collection('losses', cross_entropy_mean)
        # The total loss is defined as the cross entropy loss plus all of the weight decay terms (L2 loss).
        return tf.add_n(tf.get_collection('losses'), name='total_loss')

    def train(self, total_loss, global_step):
        '''train a step'''
        num_batches_per_epoch = self.num_training_example / self.batch_size
        decay_steps = int(num_batches_per_epoch * self.num_epoch_per_decay)
        # Decay the learning rate exponentially based on the number of steps.
        lr = tf.train.exponential_decay(
            self.init_lr, global_step, decay_steps, decay_rate=0.1, staircase=True)
        opt = tf.train.GradientDescentOptimizer(lr)
        grads = opt.compute_gradients(total_loss)
        apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)
        # Track the moving averages of all trainable variables.
        # This step just records the moving average weights but not uses them
        ema = tf.train.ExponentialMovingAverage(self.moving_average_decay,
                                                global_step)
        self.ema = ema
        variables_averages_op = ema.apply(tf.trainable_variables())
        with tf.control_dependencies([apply_gradient_op, variables_averages_op]):
            train_op = tf.no_op(name='train')
        return train_op

In [None]:
tf.reset_default_graph()
# CNN model
model = CNN_Model(batch_size = BATCH_SIZE, 
                  num_classes = NUM_CLASSES, 
                  num_training_example = NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN, 
                  num_epoch_per_decay = 350.0, 
                  init_lr = 0.1,
                  moving_average_decay = 0.9999)

In [None]:
# op for training
global_step = tf.train.get_or_create_global_step()
with tf.device('/cpu:0'):
    images, labels = distort_input(training_files, BATCH_SIZE)
with tf.variable_scope('model'):
    logits = model.inference(images)

total_loss = model.loss(logits, labels)
train_op = model.train(total_loss, global_step)

In [None]:
NUM_EPOCH = 3
NUM_BATCH_PER_EPOCH = NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN // BATCH_SIZE
ckpt_dir = './model/'

# train
saver = tf.train.Saver()
with tf.Session() as sess:
    ckpt = tf.train.get_checkpoint_state(ckpt_dir)
    if (ckpt and ckpt.model_checkpoint_path):
        saver.restore(sess, ckpt.model_checkpoint_path)
        # assume the name of checkpoint is like '.../model.ckpt-1000'
        gs = int(ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1])
        sess.run(tf.assign(global_step, gs))
    else:
        # no checkpoint found
        sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    loss = []
    for i in range(NUM_EPOCH):
        _loss = []
        for _ in range(NUM_BATCH_PER_EPOCH):
            l, _ = sess.run([total_loss, train_op])
            _loss.append(l)
        loss_this_epoch = np.sum(_loss)
        gs = global_step.eval()
        print('loss of epoch %d: %f' % (gs / NUM_BATCH_PER_EPOCH, loss_this_epoch))
        loss.append(loss_this_epoch)
        saver.save(sess, ckpt_dir + 'model.ckpt', global_step=gs)
    coord.request_stop()
    coord.join(threads)

print('Done')

In [None]:
with tf.device('/cpu:0'):
    # build testing example queue
    images, labels = eval_input(testing_files, BATCH_SIZE)
with tf.variable_scope('model', reuse=True):
    logits = model.inference(images)

# use to calculate top-1 error
top_k_op = tf.nn.in_top_k(logits, labels, 1)

In [None]:
variables_to_restore = model.ema.variables_to_restore()
saver = tf.train.Saver(variables_to_restore)
with tf.Session() as sess:
    # Restore variables from disk.
    ckpt = tf.train.get_checkpoint_state(ckpt_dir)
    if ckpt and ckpt.model_checkpoint_path:
        saver.restore(sess, ckpt.model_checkpoint_path)
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        num_iter = 1#NUM_EXAMPLES_PER_EPOCH_FOR_EVAL // BATCH_SIZE
        total_sample_count = num_iter * BATCH_SIZE
        true_count = 0
        for _ in range(num_iter):
            predictions = sess.run(top_k_op)
            true_count += np.sum(predictions)
        print('Accurarcy: %d/%d = %f' % (true_count, total_sample_count,
                                         true_count / total_sample_count))
        coord.request_stop()
        coord.join(threads)
    else:
        print('train first')