# Convolutional Neural Network - Feature extraction


<img src="https://cdn-images-1.medium.com/max/800/0*V1vb9SDnsU1eZQUy.jpg",width="800" height="600" align="center">
<center><span>Figure 1: Lenet 5 </span></center>


This LeNet architecture accepts a 32x32xC image as input, where C is the number of color channels. Since MNIST images are grayscale, C is 1 in this case.

--------------------------
**Layer 1: Convolutional.** The output shape should be 28x28x6 **Activation.** ReLU **Pooling.** The output shape should be 14x14x6.

**Layer 2: Convolutional.** The output shape should be 10x10x16. **Activation.** ReLU **Pooling.** The output shape should be 5x5x16.

**Flatten.** Flatten the output shape of the final pooling layer such that it's 1D instead of 3D.

**Layer 3: Fully Connected.** This should have 120 outputs. **Activation.** ReLU

**Layer 4: Fully Connected.** This should have 84 outputs. **Activation.** ReLU

**Layer 5: Fully Connected.** This should have 10 outputs. **Activation.** softmax


In [8]:
import tensorflow as tf
from tensorflow.contrib.layers import flatten
from sklearn.utils import shuffle
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np
import time

mnist = input_data.read_data_sets("mnist/", one_hot=True)
X_train, y_train           = mnist.train.images, mnist.train.labels
X_validation, y_validation = mnist.validation.images, mnist.validation.labels
X_test, y_test             = mnist.test.images, mnist.test.labels

print("Image Shape: {}".format(X_train[0].shape))
print("Training Set:   {} samples".format(len(X_train)))
print("Validation Set: {} samples".format(len(X_validation)))
print("Test Set:       {} samples".format(len(X_test)))

Extracting mnist/train-images-idx3-ubyte.gz
Extracting mnist/train-labels-idx1-ubyte.gz
Extracting mnist/t10k-images-idx3-ubyte.gz
Extracting mnist/t10k-labels-idx1-ubyte.gz
Image Shape: (784,)
Training Set:   55000 samples
Validation Set: 5000 samples
Test Set:       10000 samples


In [9]:
def weight_variable(shape):
    initial = tf.truncated_normal(shape, mean=0, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.01, shape=shape)
    return tf.Variable(initial)

def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='VALID')

def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1], padding='VALID')



In [46]:
import time
x = tf.placeholder(tf.float32, [None, 28, 28, 1], name='InputData')
y = tf.placeholder(tf.float32, [None, 10], name='LabelData')
keep_prob = tf.placeholder(tf.float32)

class NeuralNetworkMNIST:
    def __init__(self, LR=0.1, Batchsize=128, Iter=100, display_step = 1,
                 Optimizer=tf.train.GradientDescentOptimizer, IsDropOut=False, Activation=tf.nn.sigmoid):
        self.display_step = display_step 
        self.logs_path = "log_files/"
        self.LR = LR
        self.Iter = Iter
        self.Batchsize = Batchsize
        self.Optimizer = Optimizer
        self.IsDropOut = IsDropOut
        self.Activation = Activation

            # Model
        self.model = self.define_model(x, y)

            # Minimize error using cross entropy
        self.cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.model, labels=y)
        self.cost = tf.reduce_mean(self.cross_entropy)

        # Gradient Descent
        self.tfoptimizer = self.Optimizer(self.LR).minimize(self.cost)

        # Accuracy
        self.correct_prediction = tf.equal(tf.argmax(self.model, 1), tf.argmax(y, 1))
        self.accuracy_operation = tf.reduce_mean(tf.cast(self.correct_prediction, tf.float32))
        self.saver = tf.train.Saver()

        self.init = tf.global_variables_initializer()
    
    def define_model(self, x, y):
        mu = 0
        sigma = 0.1
        with tf.name_scope("Layer_1"):
        # resized image input 28x28x1 => 32x32x1
            x_new = tf.map_fn(lambda x1: tf.image.pad_to_bounding_box(x1, 2, 2, 32, 32), x)

            # SOLUTION: Layer 1: Convolutional. Input = 32x32x1. Output = 28x28x6.
            conv1_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 1, 6), mean = mu, stddev = sigma))
            conv1_b = bias_variable([6])
            conv1_pre   = tf.nn.conv2d(x_new, conv1_W, strides=[1, 1, 1, 1], padding='VALID') + conv1_b

            # SOLUTION: Activation.
            conv1_ac = self.Activation(conv1_pre)

            # SOLUTION: Pooling. Input = 28x28x6. Output = 14x14x6.
            conv1 = tf.nn.max_pool(conv1_ac, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

        # SOLUTION: Layer 2: Convolutional. Output = 10x10x16.
        with tf.name_scope("Layer_2"):
            conv2_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 6, 16), mean = mu, stddev = sigma))
            conv2_b = bias_variable([16])
            conv2_pre   = tf.nn.conv2d(conv1, conv2_W, strides=[1, 1, 1, 1], padding='VALID') + conv2_b

            # SOLUTION: Activation.
            conv2_ac = self.Activation(conv2_pre)

            # SOLUTION: Pooling. Input = 10x10x16. Output = 5x5x16.
            conv2 = tf.nn.max_pool(conv2_ac, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

        # SOLUTION: Flatten. Input = 5x5x16. Output = 400.t multiple values for keyword argument 'LR
        with tf.name_scope("Flatten"):
            fc0   = flatten(conv2)
        
        # SOLUTION: Layer 3: Fully Connected. Input = 400. Output = 120.
        with tf.name_scope("Layer_3"):
            fc1_W = tf.Variable(tf.truncated_normal(shape=(400, 120), mean = mu, stddev = sigma))
            fc1_b = tf.Variable(tf.zeros(120))
            fc1_pre   = tf.matmul(fc0, fc1_W) + fc1_b

            # SOLUTION: Activation.
            fc1   = self.Activation(fc1_pre)
        
        fc1_out = None
        # keep probability number to be input in training
        # drop out
        if (self.IsDropOut):
            with tf.name_scope("Dropout_1"):
                fc1_out = tf.nn.dropout(fc1, keep_prob)
        else:
            fc1_out = fc1

        # SOLUTION: Layer 4: Fully Connected. Input = 120. Output = 84.
        with tf.name_scope("Layer_4"):
            fc2_W  = tf.Variable(tf.truncated_normal(shape=(120, 84), mean = mu, stddev = sigma))
            fc2_b  = bias_variable([84])
            fc2_pre = tf.matmul(fc1_out, fc2_W) + fc2_b

        # SOLUTION: Activation.
            fc2    = self.Activation(fc2_pre)
        
        self.fc2_out = None
        if (self.IsDropOut):
            with tf.name_scope("Dropout_2"):
                self.fc2_out = tf.nn.dropout(fc2, keep_prob)
        else:
            self.fc2_out = fc2
            
        # SOLUTION: Layer 5: Fully Connected. Input = 84. Output = 10.
        with tf.name_scope("Layer_5"):
            fc3_W  = tf.Variable(tf.truncated_normal(shape=(84, 10), mean = mu, stddev = sigma))
            fc3_b  = bias_variable([10])
            fc3_pre = tf.matmul(self.fc2_out, fc3_W) + fc3_b

            logits = fc3_pre
        return logits
      
    def extractFeature(self,X_val):
        X_val_reshaped = X_val.reshape([-1, 28, 28, 1])
        if (self.IsDropOut):
            return self.sess.run([self.fc2_out], feed_dict={x: X_val_reshaped, keep_prob: 1.0})
        else:
            return self.sess.run([self.fc2_out], feed_dict={x: X_val_reshaped})
    
    def evaluation(self, X_val, y_val, sess):
        X_val_reshaped = X_val.reshape([-1, 28, 28, 1])
        accuracy = 0
        if (self.IsDropOut):
            accuracy = sess.run([self.accuracy_operation], feed_dict={x: X_val_reshaped, y: y_val, keep_prob: 1.0})
        else:
            accuracy = sess.run([self.accuracy_operation], feed_dict={x: X_val_reshaped, y: y_val})
        return accuracy
        
    def train(self, X_train, y_train, X_validation, y_validation):
        # Initializing the session 
        print ("Start Training!")
        self.sess = tf.Session()
        sess = self.sess
        sess.close()
        sess.run(self.init)
        # op to write logs to Tensorboard
        summary_writer = tf.summary.FileWriter(self.logs_path, graph=tf.get_default_graph())
        # Training cycle
        start = time.time()
        for epoch in np.arange(self.Iter):
            avg_cost = 0.
            total_batch = int(mnist.train.num_examples/self.Batchsize)
            X_train, y_train = shuffle(X_train, y_train)

            # Loop over all batches
            for offset in range(0, mnist.train.num_examples, self.Batchsize):
                end = offset + self.Batchsize
                batch_x, batch_y = X_train[offset:end], y_train[offset:end]
                batch_x = batch_x.reshape([-1,28,28,1])

                c = None
                if (self.IsDropOut == False):
                    _, c = sess.run([self.tfoptimizer, self.cost], 
                                             feed_dict={x: batch_x, y: batch_y})
                else:
                    _, c = sess.run([self.tfoptimizer, self.cost], 
                                             feed_dict={x: batch_x, y: batch_y, keep_prob: 0.75})
                avg_cost += c / total_batch

            if (epoch+1) % self.display_step == 0:
                accu = self.evaluation(X_validation, y_validation, sess)
                print("Epoch: ", '%02d' % (epoch+1), 
                      ", Loss=", "{:.9f}".format(avg_cost), 
                      ", Validation accuracy=", accu
                     )
            else:
                print("Epoch: ", '%02d' % (epoch+1), 
                      ", Loss=", "{:.9f}".format(avg_cost))
                    
        print("Optimization Finished! in %f seconds"%(time.time()-start))
        # Save model
        self.saver.save(sess, "mnist_nn_" + str(self.Iter) + "_" + str(time.time()), global_step=100)
        # Test model - Calculate accuracy
        print("Accuracy on test data: ", self.evaluation(mnist.test.images, mnist.test.labels, sess))


In [47]:
mnist_nn = NeuralNetworkMNIST(LR=0.001, Iter=100, display_step=5,
                         Optimizer=tf.train.AdamOptimizer, IsDropOut=True, Activation=tf.nn.relu)
mnist_nn.train(X_train, y_train, X_validation, y_validation)

Start Training!
Epoch:  01 , Loss= 0.459923559
Epoch:  02 , Loss= 0.134445831
Epoch:  03 , Loss= 0.096902787
Epoch:  04 , Loss= 0.074125227
Epoch:  05 , Loss= 0.065016546 , Validation accuracy= [0.98559999]
Epoch:  06 , Loss= 0.058340095
Epoch:  07 , Loss= 0.050417239
Epoch:  08 , Loss= 0.046235921
Epoch:  09 , Loss= 0.041521387
Epoch:  10 , Loss= 0.039693081 , Validation accuracy= [0.99040002]
Epoch:  11 , Loss= 0.034952509
Epoch:  12 , Loss= 0.032905150
Epoch:  13 , Loss= 0.029796152
Epoch:  14 , Loss= 0.027261079
Epoch:  15 , Loss= 0.027337105 , Validation accuracy= [0.99000001]
Epoch:  16 , Loss= 0.023096184
Epoch:  17 , Loss= 0.021861705
Epoch:  18 , Loss= 0.020957493
Epoch:  19 , Loss= 0.019923194
Epoch:  20 , Loss= 0.018759270 , Validation accuracy= [0.99299997]
Epoch:  21 , Loss= 0.018844684
Epoch:  22 , Loss= 0.016890804
Epoch:  23 , Loss= 0.018270153
Epoch:  24 , Loss= 0.016669049
Epoch:  25 , Loss= 0.015644376 , Validation accuracy= [0.99000001]
Epoch:  26 , Loss= 0.01576204

In [48]:
X_full = np.concatenate([X_train, X_validation], axis=0)
y_full = np.concatenate([y_train, y_validation])

In [49]:
X_trans_train = mnist_nn.extractFeature(X_full)
X_trans_test = mnist_nn.extractFeature(X_test)

In [50]:
np.save('minist_x_train.npy', X_trans_train[0])
np.save('minist_y_train.npy', y_full)
np.save('minist_x_test.npy', X_trans_test[0])
np.save('minist_y_test.npy', y_test)

In [None]:
import time
x = tf.placeholder(tf.float32, [None, 32, 32, 1], name='InputData')
y = tf.placeholder(tf.float32, [None, 10], name='LabelData')
keep_prob = tf.placeholder(tf.float32)

class NeuralNetworkCIFAR:
    def __init__(self, LR=0.1, Batchsize=128, Iter=100, display_step = 1,
                 Optimizer=tf.train.GradientDescentOptimizer, IsDropOut=False, Activation=tf.nn.sigmoid):
        self.display_step = display_step 
        self.logs_path = "log_files/"
        self.LR = LR
        self.Iter = Iter
        self.Batchsize = Batchsize
        self.Optimizer = Optimizer
        self.IsDropOut = IsDropOut
        self.Activation = Activation

            # Model
        self.model = self.define_model(x, y)

            # Minimize error using cross entropy
        self.cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.model, labels=y)
        self.cost = tf.reduce_mean(self.cross_entropy)

        # Gradient Descent
        self.tfoptimizer = self.Optimizer(self.LR).minimize(self.cost)

        # Accuracy
        self.correct_prediction = tf.equal(tf.argmax(self.model, 1), tf.argmax(y, 1))
        self.accuracy_operation = tf.reduce_mean(tf.cast(self.correct_prediction, tf.float32))
        self.saver = tf.train.Saver()

        self.init = tf.global_variables_initializer()
    
    def define_model(self, x, y):
        mu = 0
        sigma = 0.1
        with tf.name_scope("Layer_1"):
        # resized image input 28x28x1 => 32x32x1
            x_new = tf.map_fn(lambda x1: tf.image.pad_to_bounding_box(x1, 2, 2, 32, 32), x)

            # SOLUTION: Layer 1: Convolutional. Input = 32x32x1. Output = 28x28x6.
            conv1_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 1, 6), mean = mu, stddev = sigma))
            conv1_b = bias_variable([6])
            conv1_pre   = tf.nn.conv2d(x_new, conv1_W, strides=[1, 1, 1, 1], padding='VALID') + conv1_b

            # SOLUTION: Activation.
            conv1_ac = self.Activation(conv1_pre)

            # SOLUTION: Pooling. Input = 28x28x6. Output = 14x14x6.
            conv1 = tf.nn.max_pool(conv1_ac, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

        # SOLUTION: Layer 2: Convolutional. Output = 10x10x16.
        with tf.name_scope("Layer_2"):
            conv2_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 6, 16), mean = mu, stddev = sigma))
            conv2_b = bias_variable([16])
            conv2_pre   = tf.nn.conv2d(conv1, conv2_W, strides=[1, 1, 1, 1], padding='VALID') + conv2_b

            # SOLUTION: Activation.
            conv2_ac = self.Activation(conv2_pre)

            # SOLUTION: Pooling. Input = 10x10x16. Output = 5x5x16.
            conv2 = tf.nn.max_pool(conv2_ac, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

        # SOLUTION: Flatten. Input = 5x5x16. Output = 400.t multiple values for keyword argument 'LR
        with tf.name_scope("Flatten"):
            fc0   = flatten(conv2)
        
        # SOLUTION: Layer 3: Fully Connected. Input = 400. Output = 120.
        with tf.name_scope("Layer_3"):
            fc1_W = tf.Variable(tf.truncated_normal(shape=(400, 120), mean = mu, stddev = sigma))
            fc1_b = tf.Variable(tf.zeros(120))
            fc1_pre   = tf.matmul(fc0, fc1_W) + fc1_b

            # SOLUTION: Activation.
            fc1   = self.Activation(fc1_pre)
        
        fc1_out = None
        # keep probability number to be input in training
        # drop out
        if (self.IsDropOut):
            with tf.name_scope("Dropout_1"):
                fc1_out = tf.nn.dropout(fc1, keep_prob)
        else:
            fc1_out = fc1

        # SOLUTION: Layer 4: Fully Connected. Input = 120. Output = 84.
        with tf.name_scope("Layer_4"):
            fc2_W  = tf.Variable(tf.truncated_normal(shape=(120, 84), mean = mu, stddev = sigma))
            fc2_b  = bias_variable([84])
            fc2_pre = tf.matmul(fc1_out, fc2_W) + fc2_b

        # SOLUTION: Activation.
            fc2    = self.Activation(fc2_pre)
        
        self.fc2_out = None
        if (self.IsDropOut):
            with tf.name_scope("Dropout_2"):
                self.fc2_out = tf.nn.dropout(fc2, keep_prob)
        else:
            self.fc2_out = fc2
            
        # SOLUTION: Layer 5: Fully Connected. Input = 84. Output = 10.
        with tf.name_scope("Layer_5"):
            fc3_W  = tf.Variable(tf.truncated_normal(shape=(84, 10), mean = mu, stddev = sigma))
            fc3_b  = bias_variable([10])
            fc3_pre = tf.matmul(self.fc2_out, fc3_W) + fc3_b

            logits = fc3_pre
        return logits
      
    def extractFeature(self,X_val):
        X_val_reshaped = X_val.reshape([-1, 28, 28, 1])
        if (self.IsDropOut):
            return self.sess.run([self.fc2_out], feed_dict={x: X_val_reshaped, keep_prob: 1.0})
        else:
            return self.sess.run([self.fc2_out], feed_dict={x: X_val_reshaped})
    
    def evaluation(self, X_val, y_val, sess):
        X_val_reshaped = X_val.reshape([-1, 28, 28, 1])
        accuracy = 0
        if (self.IsDropOut):
            accuracy = sess.run([self.accuracy_operation], feed_dict={x: X_val_reshaped, y: y_val, keep_prob: 1.0})
        else:
            accuracy = sess.run([self.accuracy_operation], feed_dict={x: X_val_reshaped, y: y_val})
        return accuracy
        
    def train(self, X_train, y_train, X_validation, y_validation):
        # Initializing the session 
        print ("Start Training!")
        self.sess = tf.Session()
        sess = self.sess
        sess.close()
        sess.run(self.init)
        # op to write logs to Tensorboard
        summary_writer = tf.summary.FileWriter(self.logs_path, graph=tf.get_default_graph())
        # Training cycle
        start = time.time()
        for epoch in np.arange(self.Iter):
            avg_cost = 0.
            total_batch = int(mnist.train.num_examples/self.Batchsize)
            X_train, y_train = shuffle(X_train, y_train)

            # Loop over all batches
            for offset in range(0, mnist.train.num_examples, self.Batchsize):
                end = offset + self.Batchsize
                batch_x, batch_y = X_train[offset:end], y_train[offset:end]
                batch_x = batch_x.reshape([-1,28,28,1])

                c = None
                if (self.IsDropOut == False):
                    _, c = sess.run([self.tfoptimizer, self.cost], 
                                             feed_dict={x: batch_x, y: batch_y})
                else:
                    _, c = sess.run([self.tfoptimizer, self.cost], 
                                             feed_dict={x: batch_x, y: batch_y, keep_prob: 0.75})
                avg_cost += c / total_batch

            if (epoch+1) % self.display_step == 0:
                accu = self.evaluation(X_validation, y_validation, sess)
                print("Epoch: ", '%02d' % (epoch+1), 
                      ", Loss=", "{:.9f}".format(avg_cost), 
                      ", Validation accuracy=", accu
                     )
            else:
                print("Epoch: ", '%02d' % (epoch+1), 
                      ", Loss=", "{:.9f}".format(avg_cost))
                    
        print("Optimization Finished! in %f seconds"%(time.time()-start))
        # Save model
        self.saver.save(sess, "mnist_nn_" + str(self.Iter) + "_" + str(time.time()), global_step=100)
        # Test model - Calculate accuracy
        print("Accuracy on test data: ", self.evaluation(mnist.test.images, mnist.test.labels, sess))
