In [1]:
import os
import tensorflow as tf
import numpy as np
import math
import matplotlib.pyplot as plt
from tqdm import tnrange, tqdm_notebook, tqdm

%matplotlib inline

Import the data

In [2]:
def load_cifar10(num_training=48000, num_validation=2000, num_test=10000):
    """
    Fetch the CIFAR-10 dataset from the web and perform preprocessing to prepare
    it.  We pad the training data here for the data augmentation
    """
    # Load the raw CIFAR-10 dataset and use appropriate data types and shapes
    cifar10 = tf.keras.datasets.cifar10.load_data()
    (X_train, y_train), (X_test, y_test) = cifar10
    X_train = np.asarray(X_train, dtype=np.float32)
    y_train = np.asarray(y_train, dtype=np.int32).flatten()
    X_test = np.asarray(X_test, dtype=np.float32)
    y_test = np.asarray(y_test, dtype=np.int32).flatten()

    # Subsample the data
    mask = range(num_training, num_training + num_validation)
    X_val = X_train[mask]
    y_val = y_train[mask]
    mask = range(num_training)
    X_train = X_train[mask]
    y_train = y_train[mask]
    mask = range(num_test)
    X_test = X_test[mask]
    y_test = y_test[mask]

    # Normalize the data: subtract the mean pixel and divide by std
    mean_pixel = X_train.mean(axis=(0, 1, 2), keepdims=True)
    std_pixel = X_train.std(axis=(0, 1, 2), keepdims=True)
    X_train = (X_train - mean_pixel) / std_pixel
    X_val = (X_val - mean_pixel) / std_pixel
    X_test = (X_test - mean_pixel) / std_pixel
    
    #Pad the data by 4 on height and width
    paddings = [[0, 0,], [4, 4], [4, 4], [0, 0]]
    X_train = np.pad(X_train, paddings, 'constant')
    X_val = np.pad(X_val, paddings, 'constant')
    X_test = np.pad(X_test, paddings, 'constant')
    
    return X_train, y_train, X_val, y_val, X_test, y_test


# Invoke the above function to get our data.
X_train, y_train, X_val, y_val, X_test, y_test = load_cifar10()
print('Train data shape: ', X_train.shape)
print('Train labels shape: ', y_train.shape, y_train.dtype)
print('Validation data shape: ', X_val.shape)
print('Validation labels shape: ', y_val.shape)
print('Test data shape: ', X_test.shape)
print('Test labels shape: ', y_test.shape)

Train data shape:  (48000, 40, 40, 3)
Train labels shape:  (48000,) int32
Validation data shape:  (2000, 40, 40, 3)
Validation labels shape:  (2000,)
Test data shape:  (10000, 40, 40, 3)
Test labels shape:  (10000,)


Create a basic dataset object

In [3]:
class Dataset(object):
    def __init__(self, X, y, batch_size, shuffle=False):
        """
        Construct a Dataset object to iterate over data X and labels y
        
        Inputs:
        - X: Numpy array of data, of any shape
        - y: Numpy array of labels, of any shape but with y.shape[0] == X.shape[0]
        - batch_size: Integer giving number of elements per minibatch
        - shuffle: (optional) Boolean, whether to shuffle the data on each epoch
        """
        assert X.shape[0] == y.shape[0], 'Got different numbers of data and labels'
        self.X, self.y = X, y
        self.batch_size, self.shuffle = batch_size, shuffle

    def __iter__(self):
        N, B = self.X.shape[0], self.batch_size
        idxs = np.arange(N)
        X = self.X
        if self.shuffle:
            np.random.shuffle(idxs)
            
        return iter((X[i:i+B], self.y[i:i+B]) for i in range(0, N, B))
    
    def __len__(self):
        return len(self.y) // self.batch_size
    

batch_size = 64
train_dset = Dataset(X_train, y_train, batch_size=batch_size, shuffle=True)
val_dset = Dataset(X_val, y_val, batch_size=batch_size, shuffle=False)
test_dset = Dataset(X_test, y_test, batch_size=batch_size)

Create our neutral network

In [7]:
initializer = tf.variance_scaling_initializer()

#Helper layer function
def batch_norm_relu_conv2d_drop(inputs, filters, is_training, dropout, stride=1, reg=1e-4):
    inputs = batch_norm_relu(inputs, is_training)
    inputs = conv2d_drop(inputs,filters, is_training, dropout, stride=stride, reg=reg)
    return inputs

def batch_norm_relu(inputs, is_training):
    inputs = tf.layers.batch_normalization(inputs, training=is_training)
    return tf.nn.relu(inputs)

def conv2d_drop(inputs, filters, is_training, dropout, stride=1, reg=1e-4):
    inputs = tf.layers.conv2d(inputs, filters, 3, strides=stride, padding="same", kernel_initializer=initializer,
                           kernel_regularizer=tf.contrib.layers.l2_regularizer(scale=reg),
                             bias_regularizer= tf.contrib.layers.l2_regularizer(scale=reg))
    if dropout is not None:
        inputs = tf.nn.dropout(inputs, keep_prob=dropout)
    return inputs

#Resnet unit
def ResNet_unit(inputs, filters, is_training, dropout, i, j, subsample=False, reg=1e-4, final_unit=False):
    with tf.variable_scope(f"conv{i+2}_{j+1}"):
        shortcut = inputs
        stride = 2 if subsample else 1
        
        #for the first unit batch_norm_relu before splitting into two paths
        if i == 0 and j == 0:
            inputs = batch_norm_relu(inputs, is_training)
            shortcut = inputs
            inputs = conv2d_drop(inputs, filters, is_training, dropout, stride=stride, reg=reg)
        else:
            inputs = batch_norm_relu_conv2d_drop(inputs, filters, is_training, dropout, stride=stride, reg=reg)
        inputs = batch_norm_relu_conv2d_drop(inputs, filters, is_training, dropout, reg=reg)
        
        if subsample:
            paddings = tf.constant([[0,0], [0,0], [0,0], [0, filters // 2]])
            shortcut = tf.pad(shortcut, paddings)
            #reduce image height and width by striding as in resnet paper
            shortcut = shortcut[:, ::2, ::2, :]
            
        inputs = shortcut + inputs
        
        #Final activation
        if final_unit:
            inputs = batch_norm_relu(inputs, is_training)
        
        return inputs
    
def model_ResNetv2(inputs, is_training, total_layers=20, dropout=None, num_classes=10, reg=1e-4):
    num_layers = (total_layers - 2) // 6
    filters = [16, 32, 64]
    if dropout == 1: dropout = None
    
    with tf.variable_scope("data_augmentation"):
        if is_training == True:
            inputs = tf.image.random_flip_left_right(inputs)

            #random crop back to 32x32 (training data padded when preprocessed)
            inputs = tf.random_crop(inputs, [inputs.shape[0], 32, 32, 3])
        else:
            #central crop back to 32x32
            inputs = inputs[:, 4:36, 4:36, :]
    
    #first do a single convolution ResNet_unit with no addition
    with tf.variable_scope("conv1"):
        inputs = conv2d_drop(inputs, filters[0], is_training, dropout, reg=reg)
    
    #now some ResNet units
    for i in range(3):
        for j in range(num_layers):
            #don't subsample on first go round
            subsample = i > 0 and j == 0
            final = i == 2 and j == num_layers-1
            inputs = ResNet_unit(inputs, filters[i], is_training,
                                 dropout, i, j, subsample=subsample, reg=reg, final_unit=final)
             
    #Global average pooling, 10 way FC layer and then output to scores.
    #Global average pooling is same as doing reduce_mean
    inputs = tf.reduce_mean(inputs, axis=[1,2])
    inputs = tf.layers.flatten(inputs)
    scores = tf.layers.dense(inputs, num_classes, kernel_initializer=initializer,
                            kernel_regularizer=tf.contrib.layers.l2_regularizer(scale=reg),
                             bias_regularizer= tf.contrib.layers.l2_regularizer(scale=reg))
    return scores 

A small test to check that our neutral network works correctly

In [8]:
def test_model_ResNet_fc():
    """ A small unit test for model_ResNetv2 above. """
    tf.reset_default_graph()

    x = tf.zeros((50, 40, 40, 3))
    scores = model_ResNetv2(x, 1)
        
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        scores_np = sess.run(scores)
        print(scores_np.shape)
        
test_model_ResNet_fc()

(50, 10)


In [9]:
def check_acc_tb(sess, dset, x, scores, is_training, FLAG_print=True):
    """
    Check accuracy on a classification model.
    
    Inputs:
    - sess: A TensorFlow Session that will be used to run the graph
    - dset: A Dataset object on which to check accuracy
    - x: A TensorFlow placeholder Tensor where input images should be fed
    - scores: A TensorFlow Tensor representing the scores output from the
      model; this is the Tensor we will ask TensorFlow to evaluate.
      
    Returns: Accuracy of the model
    """
    num_correct, num_samples = 0, 0
    with tf.name_scope('accuracy'):
        for x_batch, y_batch in dset:
            feed_dict = {x: x_batch, is_training: 0}
            scores_np = sess.run(scores, feed_dict=feed_dict)
            y_pred = scores_np.argmax(axis=1)
            num_samples += x_batch.shape[0]
            num_correct += (y_pred == y_batch).sum()
        acc = float(num_correct) / num_samples
        if FLAG_print == True:
            print('Got %d / %d correct (%.2f%%)' % (num_correct, num_samples, 100 * acc))
    return acc

In [10]:
def check_acc_train(sess, x_batch, y_batch, x, scores, is_training, FLAG_print=True):
    """
    Check accuracy on a classification model from a batch of data.
    
    Inputs:
    - sess: A TensorFlow Session that will be used to run the graph
    - dset: A Dataset object on which to check accuracy
    - x: A TensorFlow placeholder Tensor where input images should be fed
    - scores: A TensorFlow Tensor representing the scores output from the
      model; this is the Tensor we will ask TensorFlow to evaluate.
      
    Returns: Accuracy of the model
    """
    num_correct, num_samples = 0, 0
    with tf.name_scope('accuracy'):
        feed_dict = {x: x_batch, is_training: 0}
        scores_np = sess.run(scores, feed_dict=feed_dict)
        y_pred = scores_np.argmax(axis=1)
        acc = float((y_pred == y_batch).sum()) / x_batch.shape[0]
        if FLAG_print == True:
            print('Got %d / %d correct (%.2f%%)' % (num_correct, num_samples, 100 * acc))
    return acc

In [11]:
def train_part5(model_init_fn, optimizer_init_fn, num_epochs=1, decay_at=None, decay_to=None,
                experiment_name="", restore=False, epoch_num=1):
    """
    Simple training loop for use with models defined using tf.keras. It trains
    a model for one epoch on the CIFAR-10 training set and periodically checks
    accuracy on the CIFAR-10 validation set.
    
    Inputs:
    - model_init_fn: A function that takes no parameters; when called it
      constructs the model we want to train: model = model_init_fn()
    - optimizer_init_fn: A function which takes no parameters; when called it
      constructs the Optimizer object we will use to optimize the model:
      optimizer = optimizer_init_fn()
    - num_epochs: The number of epochs to train for
    - decay_at: Epochs to decay the learning rate
    - decay_to: The learning rate to decay to at decay_at epochs
    
    Returns: Nothing, but prints progress during training
    """
    tf.reset_default_graph()
    
    # declare placeholders
    x = tf.placeholder(tf.float32, [None, 40, 40, 3])
    y = tf.placeholder(tf.int32, [None])
    is_training = tf.placeholder(tf.bool, name='is_training')

    # Whenever you need to record the loss, feed the mean test accuracy to this placeholder
    with tf.name_scope('acc'):
        tf_acc_ph = tf.placeholder(tf.float32,shape=None, name='acc_summary')
        # Create a scalar summary object for the accuracy so it can be displayed
        tf.summary.scalar('accuracy', tf_acc_ph)

    # Use the model function to build the forward pass.
    scores = model_init_fn(x, is_training)

    # Compute the loss
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=scores)
    cross_entropy = tf.reduce_mean(cross_entropy)
    loss_reg = tf.losses.get_regularization_loss()
    loss = cross_entropy + loss_reg
    
    tf.summary.scalar('cross_entropy', cross_entropy)
    tf.summary.scalar('loss_reg', loss_reg)
    tf.summary.scalar('loss', loss)

    optimizer = optimizer_init_fn()
    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
        with tf.name_scope('train'):
            train_op = optimizer.minimize(loss)
                
    with tf.Session() as sess:
        
        #Tensorboard, merge all summaries but the error ones
        merged = tf.summary.merge_all(scope="(?!acc)")
        merged_acc = tf.summary.merge_all(scope="(acc)")
        
        log_path = "C:/tmp/logs"
        train_writer = tf.summary.FileWriter(log_path + '/train/' + experiment_name, sess.graph)
        test_writer = tf.summary.FileWriter(log_path + '/test/' + experiment_name)
        
        sess.run(tf.global_variables_initializer())
        saver = tf.train.Saver()
        
        t = 0
        if restore:
            saver.restore(sess, f"C:/tmp/save/{experiment_name}_epoch{epoch_num}.ckpt")
            tqdm.write(f"Model restored at epoch {epoch_num}.")
            
            #update the learning rate the current epoch
            if decay_at != None and epoch_num >= decay_at[0]:
                idx = np.searchsorted(decay_at, epoch_num, side='right') - 1
                optimizer.learning_rate = decay_to[idx]
            
            #now onto a new epoch
            epoch_num += 1
            t = len(train_dset) * epoch_num
        
        for epoch in tnrange(epoch_num, num_epochs+1, leave=False, desc='Epoch'):
            #print('\nStarting epoch %d' % (epoch))
            #decay learning rate
            if decay_at != None and epoch in decay_at:
                optimizer.learning_rate = decay_to[decay_at.index(epoch)]
                tqdm.write(f"Learning rate has changed to: {optimizer.learning_rate}")
            
            for x_np, y_np in tqdm_notebook(train_dset, leave=False):
                feed_dict = {x: x_np, y: y_np, is_training: True}
                summary, loss_np, _ = sess.run([merged, loss, train_op], feed_dict=feed_dict)
                train_writer.add_summary(summary, t)
                
                #check_accuracy and add to tensorboard every 400 steps
                if t % 400 == 0 or t % print_every == 0:
                    FLAG_print=False
                    if t % print_every == 0:
                        tqdm.write('Iteration %d, loss = %.4f' % (t, loss_np))
                        FLAG_print=True
                    acc_test = check_acc_tb(sess, val_dset, x, scores,
                                            is_training, FLAG_print=FLAG_print)
                    acc_train = check_acc_train(sess, x_np, y_np, x, scores,
                                                   is_training, FLAG_print=False)
                    test_writer.add_summary(sess.run(merged_acc, feed_dict={tf_acc_ph : acc_test}), t)
                    train_writer.add_summary(sess.run(merged_acc, feed_dict={tf_acc_ph : acc_train}), t)
                t += 1
            #Save every epoch
            save_path = saver.save(sess, f"C:/tmp/save/{experiment_name}_epoch{epoch}.ckpt")
            tqdm.write("Model saved in path: %s" % save_path)
                                            
        tqdm.write('\nEnd of training, loss = %.4f' % (loss_np))
        acc_test = check_acc_tb(sess, val_dset, x, scores,
                                is_training, FLAG_print=FLAG_print)
        acc_train = check_acc_train(sess, x_np, y_np, x, scores,
                                       is_training, FLAG_print=False)
        test_writer.add_summary(sess.run(merged_acc, feed_dict={tf_acc_ph : acc_test}), t)
        train_writer.add_summary(sess.run(merged_acc, feed_dict={tf_acc_ph : acc_train}), t)
        
        #finally test of the testing dataset
        acc_test = check_acc_tb(sess, test_dset, x, scores,
                                is_training, FLAG_print=False)
        tqdm.write(f"Accuracy on the test dataset is {acc_test}")
        
        # Save the variables to disk.
        save_path = saver.save(sess, f"C:/tmp/save/{experiment_name}.ckpt")
        return acc_test

In [None]:
num_epochs = 182
total_layers = 20
dropout_prob = 0.5
learning_rate = 0.005
reg = 2e-4
decay_at=[2, 91, 136]
decay_to=[0.05, 0.005, 0.0005]
print_every = 5000


name = (f"CIFAR_ResNet{total_layers}_lr{learning_rate}"
            f"_decay{decay_at}_{decay_to}_reg{reg}_momentum")

def model_init_fn(inputs, is_training, total_layers=total_layers, reg=reg):
    return model_ResNetv2(inputs, is_training, total_layers=total_layers,
                           dropout=dropout_prob, reg=reg)
def optimizer_init_fn():
    return tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)

train_part5(model_init_fn, optimizer_init_fn, num_epochs, experiment_name=name, decay_at=decay_at, decay_to=decay_to)

In [None]:
num_epochs = 100
total_layers = 20
dropout_prob = 0.5
learning_rate = 0.001
reg = 2e-4
decay_at=[91]
decay_to=[0.0001]
print_every = 1000

name = (f"CIFAR_ResNet{total_layers}_lr{learning_rate}"
            f"_dp{dropout_prob}_reg{reg}_decay{decay_at}_{decay_to}_adam")  

def model_init_fn(inputs, is_training, total_layers=total_layers, reg=reg):
    return model_ResNetv2(inputs, is_training, total_layers=total_layers,
                           dropout=dropout_prob, reg=reg)
def optimizer_init_fn():
    return tf.train.AdamOptimizer(learning_rate=learning_rate)

train_part5(model_init_fn, optimizer_init_fn, num_epochs, experiment_name=name, decay_at=decay_at, decay_to=decay_to)