In [2]:
import tensorflow as tf
import numpy as np

In [3]:
# References: 
# https://github.com/fchollet/deep-learning-models/blob/master/resnet50.py
# Andrew Ng Deep Learning Specialization
# https://arxiv.org/abs/1512.03385

In [4]:
tf.set_random_seed(0)

In [5]:
class ResNet50(object):
    
    def __init__(self,
            sess,
            input_dim=(64, 64, 3),
            n_classes=10,
            n_epochs=2,
            batch_size=32,
            initializer=tf.glorot_uniform_initializer(seed=0)):
        self._sess = sess
        self._input_dim = input_dim
        self._n_classes = n_classes
        self._n_epochs = n_epochs
        self._batch_size = batch_size
    
        self._init = initializer
        self._build_graph()
        
    def _identity_block(self, X, f, filters, stage, block):
        """ Identity block where skip connection skips over 3 layers.
        args:
            X - Input tensor of shape (m, n_H_prev, n_W_prev, n_C_prev).
            f - Integer, filter dimension used in 2nd component of main path.
            filters - List of integers, defining # of filters in the CONV layer.
            stage - Integer used to name the layers.
            block - String used in naming layers.
        rets:
            X - Output of the identity block with shape (n_H, n_W, n_C)
        """
        # defining name basis
        conv_base = 'res' + str(stage) + block + '_branch'
        bn_base = 'bn' + str(stage) + block + '_branch'
        
        # Retrieve Filters
        F1, F2, F3 = filters
        
        # Save the input value.
        X_input = X
 
        # First component of main path
        out = tf.layers.conv2d(X, F1, [1,1], (1,1), "valid", name=conv_base+'2a', kernel_initializer=self._init)
        out = tf.layers.batch_normalization(out, axis=3, name=bn_base+'2a')
        out = tf.nn.relu(out)
        
        # Second component of main path
        out = tf.layers.conv2d(out, F2, [f,f], (1,1), "same", name=conv_base+'2b', kernel_initializer=self._init)
        out = tf.layers.batch_normalization(out, axis=3, name=bn_base+'2b')
        out = tf.nn.relu(out)
        
        # Third component of main path
        out = tf.layers.conv2d(out, F3, [1,1], (1,1), "valid", name=conv_base+'2c', kernel_initializer=self._init)
        out = tf.layers.batch_normalization(out, axis=3, name=bn_base+'2c')
        
        # Final step: Add the X_input to out
        out = tf.keras.layers.add([out, X_input])
        out = tf.nn.relu(out)
        return out
    
    def _conv_block(self, X, f, filters, stage, block, s=2):
        """ Convolution block.
        args:
            X - input tensor of shape (m, n_H_prev, n_W_prev, n_C_prev)
            f - Integer, filter dimension used in 2nd component of main path.
            filters - List of integers, defining # of filters in the CONV layer.
            stage - Integer used to name the layers.
            block - String used in naming layers.
        rets:
            X -- output of the convolutional block, tensor of shape (n_H, n_W, n_C)
        """
        # defining name basis
        conv_base = 'res' + str(stage) + block + '_branch'
        bn_base = 'bn' + str(stage) + block + '_branch'
        
        # Retrieve Filters
        F1, F2, F3 = filters
        
        # Save the input value.
        X_input = X
        
        # First component of main path
        out = tf.layers.conv2d(X, F1, [1,1], (s,s), "valid", name=conv_base+'2a', kernel_initializer=self._init)
        out = tf.layers.batch_normalization(out, axis=3, name=bn_base+'2a')
        out = tf.nn.relu(out)
        
        # Second component of main path
        out = tf.layers.conv2d(out, F2, [f,f], (1,1), "same", name=conv_base+'2b', kernel_initializer=self._init)
        out = tf.layers.batch_normalization(out, axis=3, name=bn_base+'2b')
        out = tf.nn.relu(out)
        
        # Third component of main path
        out = tf.layers.conv2d(out, F3, [1,1], (1,1), "valid", name=conv_base+'2c', kernel_initializer=self._init)
        out = tf.layers.batch_normalization(out, axis=3, name=bn_base+'2c')
        
        # Skip path
        out_skip = tf.layers.conv2d(X_input, F3, [1,1], (s,s), "valid", name=conv_base+'1', kernel_initializer=self._init)
        out_skip = tf.layers.batch_normalization(out_skip, axis=3, name=bn_base+'1')
        
        out = tf.keras.layers.add([out, out_skip])
        out = tf.nn.relu(out)
        return out
        
    def _model(self, X_input):
        """ ResNet-50.
        args:
            X - input tensor of shape (m, n_H_prev, n_W_prev, n_C_prev)
        rets:
            out - logits w/ out softmax
        """
        out = tf.keras.layers.ZeroPadding2D(padding=(3,3)).call(X_input)
    
        initializer = tf.glorot_uniform_initializer(seed=0)
        
        # Stage 1
        out = tf.layers.conv2d(out, 64, [7,7], (2,2), "valid", name='conv1', kernel_initializer=initializer)
        out = tf.layers.batch_normalization(out, axis=3, name='bn_conv1')
        out = tf.nn.relu(out)
        
        # Stage 2
        out = self._conv_block(out, f = 3, filters = [64, 64, 256], stage = 2, block='a', s = 1)
        out = self._identity_block(out, 3, [64, 64, 256], stage=2, block='b')
        out = self._identity_block(out, 3, [64, 64, 256], stage=2, block='c')
        
        # Stage 3 
        out = self._conv_block(out, f = 3, filters = [128, 128, 512], stage = 3, block='a', s = 2)
        out = self._identity_block(out, 3, [128, 128, 512], stage=3, block='b')
        out = self._identity_block(out, 3, [128, 128, 512], stage=3, block='c')
        out = self._identity_block(out, 3, [128, 128, 512], stage=3, block='d')
        
        # Stage 4
        out = self._conv_block(out, f = 3, filters = [256, 256, 1024], stage = 4, block='a', s = 2)
        out = self._identity_block(out, 3, [256, 256, 1024], stage=4, block='b')
        out = self._identity_block(out, 3, [256, 256, 1024], stage=4, block='c')
        out = self._identity_block(out, 3, [256, 256, 1024], stage=4, block='d')
        out = self._identity_block(out, 3, [256, 256, 1024], stage=4, block='e')
        out = self._identity_block(out, 3, [256, 256, 1024], stage=4, block='f')

        # Stage 5
        out = self._conv_block(out, f = 3, filters = [512, 512, 2048], stage = 5, block='a', s = 2)
        out = self._identity_block(out, 3, [512, 512, 2048], stage=5, block='b')
        out = self._identity_block(out, 3, [512, 512, 2048], stage=5, block='c')
        
        out = tf.layers.AveragePooling2D(pool_size=(2, 2), strides=(1,1), padding='valid', name='avg_pool').call(out)
        out = tf.layers.flatten(out)
        # Note: softmax applied by tf.nn.softmax_cross_entropy_with_logits
        out = tf.layers.dense(out, self._n_classes, name='fc' + str(self._n_classes), kernel_initializer=self._init)
        return out
        
    def _build_graph(self):
        """ Build graph and define placeholders and variables. """
        n_h, n_w, n_c = self._input_dim
        self._inputs = tf.placeholder("float", [None, n_h, n_w, n_c])
        self._labels = tf.placeholder("float", [None, self._n_classes])
        self._global_step = tf.Variable(0, dtype=tf.int32, trainable=False, name='global_step')

        self._logits = self._model(self._inputs)
        self._loss = tf.nn.softmax_cross_entropy_with_logits(labels=self._labels, logits=self._logits)
        self._train = tf.train.AdamOptimizer().minimize(self._loss, global_step=self._global_step)

        
    def _train_input_fn(self, X_train, y_train):
        """ Utility to set up iterator for feed of batched train data.
        args:
            X_train - train input of dim (m, n_H_prev, n_W_prev, n_C_prev)
            y_train - train labels of dim (m, n_classes)
        rets:
            iterator - iterator object
            next_element - operation to get next from iterator
        """
        def _one_hot(label):
            return tf.gather(tf.one_hot(label, self._n_classes),0)
    
        images = tf.data.Dataset.from_tensor_slices(X_train)
        labels = tf.data.Dataset.from_tensor_slices(y_train)
        labels = labels.map(_one_hot)
        dataset = tf.data.Dataset.zip((images, labels))
        dataset = dataset.batch(self._batch_size)
        iterator = dataset.make_initializable_iterator()
        next_element = iterator.get_next()
        return iterator, next_element
    
    def train(self, X_train, y_train):
        """ Train
        args:
            X_train - train input of dim (m, n_H_prev, n_W_prev, n_C_prev)
            y_train - train labels of dim (m, n_classes)
        """
        tf.global_variables_initializer().run()
        
        train_iterator, train_next = self._train_input_fn(X_train, y_train)
            
        for epoch in range(self._n_epochs):
            self._sess.run(train_iterator.initializer)
            epoch_loss = np.array([])
            while True:
                try:
                    img_batch, label_batch = self._sess.run(train_next)
                    loss, _ = self._sess.run([self._loss, self._train],
                                             feed_dict={
                            self._inputs: img_batch,
                            self._labels: label_batch}
                    )
                    epoch_loss = np.concatenate([epoch_loss,loss])
                except tf.errors.OutOfRangeError:
                    break
                    
            print("Epoch: {} Train Loss: {}".format(
                epoch, np.mean(np.array(epoch_loss)))
            )
    
    def evaluate(self, X, y):
        # Set up ops.
        predict_op = tf.argmax(tf.nn.softmax(self._logits), 1)
        correct_prediction = tf.equal(predict_op, tf.argmax(self._labels, 1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
        
        # One hot encode y.
        one_hot = lambda y_: np.eye(self._n_classes)[y_].flatten()
        # Reshape to match dimensions of self._labels
        y_one_hot = list(map(one_hot,y))
        # Evaluate ops.
        result = accuracy.eval({self._inputs: X, self._labels: y_one_hot})
        return result
        
    

In [6]:
# Test identity block

tf.reset_default_graph()

with tf.Session() as identity_sess:
    np.random.seed(1)
    A_prev = tf.placeholder("float", [3, 4, 4, 6])
    X = np.random.randn(3, 4, 4, 6)
    model = ResNet50(identity_sess)
    A = model._identity_block(A_prev, f = 2, filters = [2, 4, 6], stage = 1, block = 'a')
    tf.global_variables_initializer().run()
    result = identity_sess.run([A], feed_dict={A_prev: X })
    print("out = " + str(result[0][1][1][0]))
    
# out = [ 0.94822985  0.          1.16101444  2.747859    0.          1.36677003]

out = [ 0.94822985  0.          1.16101444  2.747859    0.          1.36677003]


In [7]:
# Test convolution block

tf.reset_default_graph()

with tf.Session() as conv_sess:
    np.random.seed(1)
    A_prev = tf.placeholder("float", [3, 4, 4, 6])
    X = np.random.randn(3, 4, 4, 6)
    model = ResNet50(conv_sess)
    A = model._conv_block(A_prev, f = 2, filters = [2, 4, 6], stage = 1, block = 'a')
    conv_sess.run(tf.global_variables_initializer())
    out = conv_sess.run([A], feed_dict={A_prev: X})
    print("out = " + str(out[0][1][1][0]))
# out = [ 0.09018463  1.23489773  0.46822017  0.0367176   0.          0.65516603]

out = [ 0.09018463  1.23489773  0.46822017  0.0367176   0.          0.65516603]


In [8]:
from keras.datasets import cifar10

(X_train, y_train), (X_test, y_test) = cifar10.load_data()

Using TensorFlow backend.


In [9]:
print("number of training examples = " + str(X_train.shape[0]))
print("number of test examples = " + str(X_test.shape[0]))
print("X_train shape: " + str(X_train.shape))
print("Y_train shape: " + str(y_train.shape))
print("X_test shape: " + str(X_test.shape))
print("Y_test shape: " + str(y_test.shape))

number of training examples = 50000
number of test examples = 10000
X_train shape: (50000, 32, 32, 3)
Y_train shape: (50000, 1)
X_test shape: (10000, 32, 32, 3)
Y_test shape: (10000, 1)


In [10]:
# sample smaller set if testing on CPU
n_cpu = 1000
X_cpu = X_train[:n_cpu]
y_cpu = y_train[:n_cpu]
X_test_cpu = X_test[:n_cpu]
y_test_cpu = y_test[:n_cpu]

In [11]:
tf.reset_default_graph()
with tf.Session() as sess:
    model = ResNet50(sess, input_dim=(32,32,3), n_epochs=5)
    model.train(X_cpu, y_cpu)
    result = model.evaluate(X_test_cpu, y_test_cpu)
    print("Accuracy: {}".format(result))

Epoch: 0 Train Loss: 35.99907213047441
Epoch: 1 Train Loss: 2.2643281412124634
Epoch: 2 Train Loss: 2.118570439577103
Epoch: 3 Train Loss: 1.989766313880682
Epoch: 4 Train Loss: 1.888827444076538
Accuracy: 0.28999999165534973
