# This notebook contains all the necessary objects and functions to create and train a basic CNN model using the Keras API from Tensorflow. It also contains cells for datapreprocessing.

### The next cell will import all necessary libraries and API and allow for GPU memory growth (not recommended if not familiar with how memory is allocated in Tensorflow)

In [8]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'    #To not print info mesages
import tensorflow as tf                     #import necessary Libraries
from tensorflow import keras
from tensorflow.keras import datasets, layers, models
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
import numpy as np

tf.keras.backend.set_floatx('float32')
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:    #To allow GPU memory growth
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

### This cell creates the model as an object, using the Keras layers. Note that this version includes my final code with advanced techniques such as batch normalization and max pooling. 

In [9]:
#---Model--------------------------------------------------------------------------------------------------------------------
class MyModel(tf.keras.Model):                          #Make the thing
    def __init__(self, kernels, filters):               #Put smaller things in it
        super(MyModel, self).__init__(name="basicCNN")
        kernel1, kernel2, kernel3, kernel4 = kernels             #Decide the shape of the smaller things
        filter1, filter2, filter3, filter4 = filters             #Decide how many there are

        self.conv2a = tf.keras.layers.Conv2D(filter1, kernel1, padding='same')  #Now stack'em 
        self.bnorm1 = tf.keras.layers.BatchNormalization()
        self.act1 = tf.keras.layers.Activation(tf.nn.relu)                      #Non-lin otherwhise you just compute f(x)=W.x
        self.mxpool1 = tf.keras.layers.MaxPool2D(pool_size = (2,2), padding = 'valid')  #Now shrink the thingy

        self.conv2b = tf.keras.layers.Conv2D(filter2, kernel2, padding='same')  #And again
        self.bnorm2 = tf.keras.layers.BatchNormalization()
        self.act2 = tf.keras.layers.Activation(tf.nn.relu)
        self.mxpool2 = tf.keras.layers.MaxPool2D(pool_size = (2,2), padding = 'valid')

        self.conv2c = tf.keras.layers.Conv2D(filter3, kernel3, padding='same')  #And again
        self.bnorm3 = tf.keras.layers.BatchNormalization()
        self.act3 = tf.keras.layers.Activation(tf.nn.relu)
        self.mxpool3 = tf.keras.layers.MaxPool2D(pool_size = (2,2), padding = 'valid')

        self.conv2d = tf.keras.layers.Conv2D(filter4, kernel4, padding='same')  #And again
        self.bnorm4 = tf.keras.layers.BatchNormalization()
        self.act4 = tf.keras.layers.Activation(tf.nn.relu)
        self.mxpool4 = tf.keras.layers.MaxPool2D(pool_size = (2,2), padding = 'valid')

        self.flat = tf.keras.layers.Flatten()           #Now decide what it is you are actually looking at
        self.FC1 = tf.keras.layers.Dense(64)
        self.FCo = tf.keras.layers.Dense(10)

    def call(self, input, training=False):
        x = self.conv2a(input)
        x = self.bnorm1(x)
        x = self.act1(x)
        x = self.mxpool1(x)

        x = self.conv2b(x)
        x = self.bnorm2(x)
        x = self.act2(x)
        x = self.mxpool2(x)

        x = self.conv2c(x)
        x = self.bnorm3(x)
        x = self.act3(x)
        x = self.mxpool3(x)

        x = self.conv2d(x)
        x = self.bnorm4(x)
        x = self.act4(x)
        x = self.mxpool4(x)

        x = self.flat(x)
        x = self.FC1(x)
        x = self.FCo(x)
        return x


In [10]:
#---Variables----------------------------------------------------------------------------------------------------------------
batch_size = 32
num_epochs = 5
kernels = [(4,4), (4,4), (4,4), (4,4)]  #A list for the kernel sizes
filters = [16, 32, 64, 128]             #A list for the filteer sizes
base_learning_rate = 1e-3

### In order to make the data understandable for the network and to ensure a smoother training, we preprocess the data by normalizing it and making batches with sizes defined above.

In [11]:
#---Custom Functions --------------------------------------------------------------------------------------------------------
def create_set(limits, data, b_s):      #Function to create a set of images with batches. Data is an array containing the data. B_s is the batch size.
    i_min, i_max = limits               #limits is to be of shape (i_min, i_max)
    new_set = []
    for i in range(i_min, i_max, b_s):
        new_set.append(data[i : i+b_s])
    return new_set

In [28]:
#---Data Preprocessing-------------------------------------------------------------------------------------------------------
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data() #import dataset
x_train, x_test = x_train / 255.0, x_test / 255.0       #Normalization pixels

rng_state = np.random.get_state()
np.random.shuffle(x_train)                              #Shuffle it before making train, val and test sets
np.random.set_state(rng_state)
np.random.shuffle(y_train)

x_train = x_train[..., tf.newaxis].astype("float32")    #Add an axis for the single channel
x_test = x_test[..., tf.newaxis].astype("float32")      #Make test set

y_train = tf.keras.backend.one_hot(y_train, 10)
y_test = tf.keras.backend.one_hot(y_test, 10)      
#One-hot encodes a prediction vector containing integers from 0 to 9 (because there are 10 categories) into a vector with size 10 caintaining 1 at the place for the correct class and 0 elsewhere.

#Substract mean image. This is not indispensable but allows for a better generalization potential for the network
x_test -= np.mean(np.mean(x_train.reshape(60000,784), axis=1), axis=0)
x_train -= np.mean(np.mean(x_train.reshape(60000,784), axis=1), axis=0)

x_train_dataset = create_set((0, 60000), x_train, 32)   #Make the batches
y_train_dataset = create_set((0, 60000), y_train, 32)
train_dataset = []

for i in range(len(x_train_dataset)):                   #Make a dataset as a list with length=2 containing the images and the correct classifications
    train_dataset.append( (x_train_dataset[i], y_train_dataset[i]) )

x_test_dataset = create_set((0, 10000), x_test, 32)     #Repeat for the test set
y_test_dataset = create_set((0, 10000), y_test, 32)
test_dataset = []

for i in range(len(x_test_dataset)):
    test_dataset.append( (x_test_dataset[i], y_test_dataset[i]) )

### We now need to train our network

In [26]:
model = MyModel(kernels, filters)
optimizer = keras.optimizers.Adam(learning_rate=base_learning_rate)
loss_fn = keras.losses.CategoricalCrossentropy(from_logits=True)

train_loss_metric = keras.metrics.Mean()
accuracy_metric = keras.metrics.Mean()

train_loss_list = []
train_acc_list = []

for epoch in range(num_epochs):
    print("Start of Epoch {}".format(epoch + 1))
    train_loss_metric.reset_states()
    accuracy_metric.reset_states()
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            tape.watch(model.trainable_weights)
            predictions = model(x_batch_train, training=True)
            batch_loss_value = loss_fn(y_batch_train, predictions)
        grads = tape.gradient(batch_loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        train_loss_metric.update_state(batch_loss_value)
        num_positive_class ,= np.where(np.argmax(predictions, axis=1) == np.argmax(y_batch_train, axis=1))
        accuracy_metric.update_state(num_positive_class.size/batch_size)

        if int(step+1) % 200 == 0:
            train_loss_list.append(train_loss_metric.result())
            train_acc_list.append(accuracy_metric.result())
            print(
                "Step %d: Train loss: %.4f, Train Accuracy: %.4f"
                % (
                    step+1,
                    float(train_loss_metric.result()), 
                    float(accuracy_metric.result())
                )
                    )
            train_loss_metric.reset_states()
            accuracy_metric.reset_states()

Start of Epoch 1
Step 200: Train loss: 0.6649, Train Accuracy: 0.7692
Step 400: Train loss: 0.4634, Train Accuracy: 0.8355
Step 600: Train loss: 0.4353, Train Accuracy: 0.8427
Step 800: Train loss: 0.3816, Train Accuracy: 0.8647
Step 1000: Train loss: 0.3542, Train Accuracy: 0.8723
Step 1200: Train loss: 0.3295, Train Accuracy: 0.8797
Step 1400: Train loss: 0.3426, Train Accuracy: 0.8698
Step 1600: Train loss: 0.3314, Train Accuracy: 0.8792
Step 1800: Train loss: 0.3075, Train Accuracy: 0.8838
Start of Epoch 2
Step 200: Train loss: 0.2987, Train Accuracy: 0.8895
Step 400: Train loss: 0.2940, Train Accuracy: 0.8870
Step 600: Train loss: 0.2975, Train Accuracy: 0.8944
Step 800: Train loss: 0.2777, Train Accuracy: 0.9002
Step 1000: Train loss: 0.2629, Train Accuracy: 0.9041
Step 1200: Train loss: 0.2519, Train Accuracy: 0.9052
Step 1400: Train loss: 0.2703, Train Accuracy: 0.8989
Step 1600: Train loss: 0.2673, Train Accuracy: 0.9033
Step 1800: Train loss: 0.2493, Train Accuracy: 0.9034
St

### Now that our network is trained, we need to evaluate it on a test dataset. By comparing the accuracy results for training and testing, we can have insight on the generalization potential of our network on unseen data.

In [30]:
test_loss_metric = keras.metrics.Mean()
test_accuracy_metric = keras.metrics.Mean()

for step, (x_batch_test, y_batch_test) in enumerate(test_dataset):
    predictions = model(x_batch_test, training=True)
    batch_loss_value = loss_fn(y_batch_test, predictions)

    test_loss_metric.update_state(batch_loss_value)
    num_positive_class ,= np.where(np.argmax(predictions, axis=1) == np.argmax(y_batch_test, axis=1))
    test_accuracy_metric.update_state(num_positive_class.size/batch_size)

print("Average Test loss: %.4f" % (test_loss_metric.result()))
print("Average Test accuracy: %.4f" % (test_accuracy_metric.result()))

Average Test loss: 0.2827
Average Test accuracy: 0.9057
