In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import tensorflow as tf
import warnings
warnings.filterwarnings('ignore')
from tensorflow.python.framework import ops

# 1. Load the data

In [None]:
train = pd.read_csv('train.csv')
test = pd.read_csv('test.csv')

In [None]:
train.shape, test.shape

# 2. Visualize the digits

Let us look at some random digits from the training sample and their respective labels 

In [None]:
num_images = 6
m = train.shape[0]
idx = np.random.choice(m, size=num_images)

In [None]:
len(idx)

In [None]:
num_rows = 2
num_cols = 3

fig, ax = plt.subplots(nrows = num_rows, ncols = num_cols)
fig.set_size_inches(12,10)

for i, j in enumerate(idx):
    # Find the right place to put the images, a is the row in the figure and b is the column
    
    a = i//num_cols
    b = i%num_cols

    # Remove ticks
    
    ax[a][b].tick_params(
    which='both',
    left=False,
    right=False,
    bottom=False,
    top=False,
    labelleft = False,
    labelbottom=False)
    
    # Draw image and set x label as the actual label of the image i.e. the value of the digit in the image
    
    ax[a][b].imshow(np.array(train.loc[j][1:]).reshape(28,28), cmap=plt.get_cmap('gray'))
    ax[a][b].set_xlabel(str(train.loc[j][0]), fontsize = 50)

plt.show()

# 3. Convert data to the right shape for CNN

Convert the flattened arrays to image arrays, normalize by dividing by 255 and separate features (X) from labels (y)

In [None]:
train.describe()

In [None]:
X = np.array(train.iloc[:,1:])

In [None]:
X.shape

In [None]:
X = X.reshape((m,28,28,1))

In [None]:
y = np.array(train.label)

def convert_to_one_hot(Y, C):
    Y = np.eye(C)[Y]
    return Y

y = convert_to_one_hot(y,10)

In [None]:
y[0:5]

In [None]:
train.label.head()

In [None]:
# Set random seed

seed = 5
np.random.seed(seed)

# Get random training index

train_index = np.random.choice(m, round(m*0.9), replace=False)
dev_index = np.array(list(set(range(m)) - set(train_index)))

# Make training and dev

X_train = X[train_index]
X_dev = X[dev_index]

y_train = y[train_index]
y_dev = y[dev_index]

In [None]:
X_train.shape

In [None]:
m_test = test.shape[0]
X_test = np.array(test).reshape((m_test,28,28,1))


In [None]:
X_test.shape

In [None]:
X_train = X_train/255.
X_dev = X_dev/255.
X_test = X_test/255.
X_test = np.float32(X_test)

In [None]:
print ("number of training examples = " + str(X_train.shape[0]))
print ("number of validation \(dev\) examples = " + str(X_dev.shape[0]))
print ("number of test examples = " + str(X_test.shape[0]))
print ("X_train shape: " + str(X_train.shape))
print ("y_train shape: " + str(y_train.shape))
print ("X_dev shape: " + str(X_dev.shape))
print ("y_dev shape: " + str(y_dev.shape))
print ("X_test shape: " + str(X_test.shape))

# 4. Apply LeNet 5 architecture

The LeNet architecture we will apply is as follows:

INPUT => CONV (28x28x20, f = 5, s = 1) => RELU => POOL (14x14x20, f = 2, s = 2) => CONV (14x14x50, f = 5, s = 1) => RELU => POOL (7x7x50, f = 2, s = 2) + flatten => FC (120) => RELU => FC (84) => softmax

Thus, there are 2 conv layers and 2 pooling layers. Then 2 fully connected layers with ReLU activation and the final layer with a softmax

In [None]:
# Create Placeholders

def create_placeholders(n_H0,n_W0,n_C0,n_y):
    X = tf.placeholder(dtype = tf.float32,shape = [None, n_H0, n_W0, n_C0])
    Y = tf.placeholder(dtype = tf.float32,shape = [None, n_y])
    return X, Y

In [None]:
# Initialize parameters

def initialize_parameters():
    tf.set_random_seed(1)
    initializer = tf.contrib.layers.xavier_initializer(seed = 0)
    W1 = tf.get_variable(name = 'W1', shape = [5, 5, 1, 20], initializer = initializer)
    W2 = tf.get_variable(name = 'W2', shape = [5, 5, 20, 50], initializer = initializer)
    parameters = {"W1": W1,
                  "W2": W2}
    
    return parameters

In [None]:
# Check parameters

tf.reset_default_graph()
with tf.Session() as sess_test:
    parameters = initialize_parameters()
    init = tf.global_variables_initializer()
    sess_test.run(init)
    print("W1 = " + str(parameters["W1"].eval()[1,1,0]))
    print("W2 = " + str(parameters["W2"].eval()[1,1,1]))

In [None]:
# Build forward propagation computation graph

def forward_propagation(X, parameters):
    W1 = parameters['W1']
    W2 = parameters['W2']
 
    # CONV2D: stride of 1, padding 'SAME'
    Z1 = tf.nn.conv2d(X,W1, strides = [1,1,1,1], padding = 'SAME')
    # RELU
    A1 = tf.nn.relu(Z1)
    # MAXPOOL: window 2x2, stride 2, padding 'VALID'
    P1 = tf.nn.max_pool(A1, ksize = [1,2,2,1], strides = [1,2,2,1], padding = 'VALID')
    # CONV2D: filters W2, stride 1, padding 'SAME'
    Z2 = tf.nn.conv2d(P1,W2, strides = [1,1,1,1], padding = 'SAME')
    # RELU
    A2 = tf.nn.relu(Z2)
    # MAXPOOL: window 2x2, stride 2, padding 'VALID'
    P2 = tf.nn.max_pool(A2, ksize = [1,2,2,1], strides = [1,2,2,1], padding = 'VALID')
    # FLATTEN
    P2 = tf.contrib.layers.flatten(P2)
    # FULLY-CONNECTED 
    Z3 = tf.contrib.layers.fully_connected(P2, 120)
    # FULLY-CONNECTED 
    Z4 = tf.contrib.layers.fully_connected(Z3, 84)
    # FULLY-CONNECTED 
    Z5 = tf.contrib.layers.fully_connected(Z4, 10, activation_fn = tf.nn.softmax)
    
    return Z5

In [None]:
# Check forward propagation

tf.reset_default_graph()

with tf.Session() as sess:
    np.random.seed(1)
    X, Y = create_placeholders(28, 28, 1, 10)
    parameters = initialize_parameters()
    Z5 = forward_propagation(X, parameters)
    init = tf.global_variables_initializer()
    sess.run(init)
    a = sess.run(Z5, {X: np.random.randn(2,28,28,1), Y: np.random.randn(2,10)})
    print("Z5 = " + str(a))
    

In [None]:
# Compute Cost

def compute_cost(Z5, Y):
    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits = Z5, labels = Y))
    return cost

In [None]:
# Check cost function

tf.reset_default_graph()

with tf.Session() as sess:
    np.random.seed(1)
    X, Y = create_placeholders(28, 28, 1, 10)
    parameters = initialize_parameters()
    Z5 = forward_propagation(X, parameters)
    cost = compute_cost(Z5, Y)
    init = tf.global_variables_initializer()
    sess.run(init)
    a = sess.run(cost, {X: np.random.randn(4,28,28,1), Y: np.random.randn(4,10)})
    print("cost = " + str(a))

In [None]:
# Set hyperparameters and optimization function

learning_rate = 0.001
num_epochs = 10
batch_size = 64


In [None]:
def model(X_train, X_test, y_train, learning_rate = learning_rate, num_epochs = num_epochs, batch_size = batch_size, print_cost = True):
    
    ops.reset_default_graph()                         # to be able to rerun the model without overwriting tf variables
    (m_train, n_H0, n_W0, n_C0) = X_train.shape             
    n_y = y_train.shape[1]                            
    costs = []                                        # To keep track of the cost
    num_batches = (m_train//batch_size) + 1
    
    # Create Placeholders of the correct shape
    
    X, Y = create_placeholders(n_H0, n_W0, n_C0, n_y)
    

    # Initialize parameters
    
    parameters = initialize_parameters()
    
    
    # Forward propagation: Build the forward propagation in the tensorflow graph
    
    Z5 = forward_propagation(X, parameters)
    
    
    # Cost function: Add cost function to tensorflow graph
    
    cost = compute_cost(Z5, Y)
    
    
    # Backpropagation: Define the tensorflow optimizer. Use an AdamOptimizer that minimizes the cost.
    
    optimizer = tf.train.AdamOptimizer(learning_rate).minimize(cost)
    
    
    # Initialize all the variables globally
    init = tf.global_variables_initializer()
  

    # Start the session to compute the tensorflow graph
    with tf.Session() as sess:

        # Run the initialization
        sess.run(init)

        for epoch in range(num_epochs):
            # Generate random batch index
            minibatch_cost = 0
            full_batch = range(m_train)

            for batch in range(num_batches):        
                try:
                    batch_index = np.random.choice(full_batch, size=batch_size, replace = False)
                    full_batch = np.array(list(set(full_batch) - set(batch_index)))
                except ValueError:
                    batch_index = full_batch
                batch_train_X = X_train[batch_index]
                batch_train_y = y_train[batch_index]

                # Run session to reach goal 

                sess.run(optimizer, feed_dict={X: batch_train_X, Y: batch_train_y})
                temp_cost = sess.run(cost, feed_dict={X: batch_train_X, Y: batch_train_y})
                minibatch_cost += temp_cost / num_batches

            # Print the cost every epoch
            
            print ("Cost after epoch %i: %f" % (epoch+1, minibatch_cost))
            costs.append(minibatch_cost)


        # plot the cost
        plt.plot(np.squeeze(costs))
        plt.ylabel('cost')
        plt.xlabel('iterations (per tens)')
        plt.title("Learning rate =" + str(learning_rate))
        plt.show()

        # Calculate the correct predictions
        predict_op = tf.argmax(Z5, 1)
        correct_prediction = tf.equal(predict_op, tf.argmax(Y, 1))
      
        # Calculate accuracy on the train and dev sets
        
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
        print(accuracy)
        train_accuracy = accuracy.eval({X: X_train, Y: y_train})
        dev_accuracy = accuracy.eval({X: X_dev, Y: y_dev})
        print("Train Accuracy:", train_accuracy)
        print("Dev Accuracy:", dev_accuracy)
        
        # Make predictions on test set
        
        test_preds = sess.run(predict_op, feed_dict ={X:X_test})
        
        return train_accuracy, dev_accuracy, parameters, test_preds

In [None]:
train_accuracy, dev_accuracy, parameters, test_preds = model(X_train, X_test, y_train)

# 5. Check random sample prediction from test set

In [None]:
i = np.random.choice(m_test)
print("Test sample no.: {}".format(i))

print('Prediction: {}'.format(test_preds[i]))
plt.imshow(X_test[i,:,:,0],cmap = plt.get_cmap('gray'))
plt.show()


# 6. Write submission file

In [None]:
test['Label'] = test_preds

In [None]:
test['ImageId'] = list(range(1,m_test+1))

In [None]:
test.head()

In [None]:
test[['ImageId', 'Label']].to_csv('submission_lenet5.csv', index = False, header = ['ImageId','Label'])