# Code for the CNN model

In [None]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import numpy as np
from scipy import signal
from tqdm.auto import tqdm
import random
import tensorflow as tf
from tensorflow import keras
from keras.datasets import mnist
import joblib


# loss function

def multiclass_cross_entropy(y_true, y_pred, epsilon=1e-7):
    # clipping for stable log operation
    y_pred = np.log(np.clip(y_pred, epsilon, 1 - epsilon))
    return -np.sum(y_true * y_pred)


def multiclass_cross_entropy_prime(y_true, y_pred, epsilon=1e-7):
    #return -y_true / np.clip(y_pred, epsilon, 1 - epsilon)
    return y_pred - y_true


class Reshaper:

    def __init__(self, input_shape, output_shape):
        self.input_shape = input_shape
        self.output_shape = output_shape
        self.is_trainable = False

    def forward(self, input):
        return np.reshape(input, self.output_shape)

    def backward(
            self,
            output_gradient,
            timestep
    ):
        return np.reshape(output_gradient, self.input_shape)


class Softmax:
    def __init__(self):
        self.is_trainable = False

    def softmax(self, x):
        x_exp = np.exp(x - x.max())
        self.output = x_exp / np.sum(x_exp, axis=0)
        return self.output

    def softmax_prime(self, output_gradient):
        return np.dot((np.identity(np.size(self.output)) - self.output.T) * self.output,
                      output_gradient)

    def forward(self, input):
        self.input = input
        return self.softmax(self.input)

    def backward(
            self,
            output_gradient,
            timestep
    ):
        return self.softmax_prime(output_gradient)


class ReLU:
    def __init__(self):
        self.is_trainable = False

    @staticmethod
    def relu(x):
        return np.maximum(x, 0)

    @staticmethod
    def relu_prime(x):
        return 1. * (x > 0)

    def forward(self, input):
        self.input = input
        return self.relu(self.input)

    def backward(
            self,
            output_gradient,
            timestep
    ):
        return np.multiply(output_gradient, self.relu_prime(self.input))


class Dense:

    def __init__(self, input_size, output_size, name=None):
        # He normal initialization as advised by He et al., 2015

        self.weights = np.random.normal(0,
                                        np.sqrt(2 / input_size),
                                        size=(output_size, input_size))
        self.bias = np.zeros((output_size, 1))
        self.is_trainable = True
        self.name = name

        # gradient update params, necessary for Adam
        self.optim = Adam()

        self.m = (np.zeros(self.weights.shape),
                  np.zeros(self.bias.shape))

        self.v = (np.zeros(self.weights.shape),
                  np.zeros(self.bias.shape))

    def forward(self, input):
        self.input = input
        return np.dot(self.weights, self.input) + self.bias

    def backward(
            self,
            output_gradient,
            timestep
    ):
        # calculate gradients

        weights_gradient = np.dot(output_gradient, self.input.T)
        input_gradient = np.dot(self.weights.T, output_gradient)

        self.weights, self.bias, self.m, self.v = \
            self.optim.update_weights(
                self.weights,
                self.bias,
                weights_gradient,
                output_gradient,
                self.m,
                self.v,
                timestep
            )

        return input_gradient


class Adam:

    def __init__(
            self,
            beta1=0.9,
            beta2=0.999,
            epsilon=1e-7,
    ):
        """Adam optimizer implementation"""

        self.learning_rate = None
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon

    def get_bias_corr_denom(self, timestep):
        return (1 - self.beta1 ** (timestep + 1),
                1 - self.beta2 ** (timestep + 1))

    def update_weights(
            self,
            layer_weights,
            layer_bias,
            layer_weight_gradients,
            layer_bias_gradients,
            layer_m,
            layer_v,
            timestep
    ):
        """helper for updating weights based on Adam formulization"""

        denom1, denom2 = self.get_bias_corr_denom(timestep)

        # update for weight params
        new_weight_m = self.beta1 * layer_m[0] + (1 - self.beta1) * layer_weight_gradients
        new_weight_m_cor = new_weight_m / denom1

        new_weight_v = self.beta2 * layer_v[0] + (1 - self.beta2) * layer_weight_gradients ** 2
        new_weight_v_cor = new_weight_v / denom2

        layer_weights -= self.learning_rate * new_weight_m_cor / (np.sqrt(new_weight_v_cor) + self.epsilon)

        # update for bias params

        new_bias_m = self.beta1 * layer_m[1] + (1 - self.beta1) * layer_bias_gradients
        new_bias_m_cor = new_bias_m / denom1

        new_bias_v = self.beta2 * layer_v[1] + (1 - self.beta2) * layer_bias_gradients ** 2
        new_bias_v_cor = new_bias_v / denom2

        layer_bias -= self.learning_rate * new_bias_m_cor / (np.sqrt(new_bias_v_cor) + self.epsilon)

        return layer_weights, layer_bias, (new_weight_m, new_bias_m), (new_weight_v, new_bias_v)


def calculate_output_size(
        input_size,
        kernel_size,
        stride,
        pad_start=0,
        pad_end=0,
):
    return int((input_size - kernel_size + pad_start + pad_end)
               / stride + 1)


class Convolution:

    def __init__(
            self,
            channel,
            input_size,
            kernel_size,
            stride,
            name=None
    ):
        self.channel = channel
        (self.input_channel, input_width, input_height) = input_size
        self.kernel_size = kernel_size
        self.stride = stride
        self.is_trainable = True
        self.name = name

        # necessary values for shape calculation

        output_width = calculate_output_size(input_width, kernel_size,
                                             stride)
        output_height = calculate_output_size(input_height,
                                              kernel_size, stride)

        # weights & biases, initialized by He et al., 2015
        self.kernels = np.random.normal(0,
                                        np.sqrt(2 / np.prod(input_size)),
                                        size=(self.channel, self.input_channel,
                                              kernel_size, kernel_size))

        self.bias = np.zeros((self.channel, output_width,
                              output_height))

        # gradient update params, necessary for Adam
        self.optim = Adam()

        self.m = (np.zeros(self.kernels.shape),
                  np.zeros(self.bias.shape))

        self.v = (np.zeros(self.kernels.shape),
                  np.zeros(self.bias.shape))

    def forward(self, input):
        self.input = input
        self.output = np.copy(self.bias)

        for i in range(self.channel):
            for j in range(self.input_channel):
                self.output += signal.correlate(self.input[j],
                                                self.kernels[i, j], 'valid')

        return self.output

    def backward(
            self,
            output_gradient,
            timestep
    ):

        kernels_gradient = np.zeros(self.kernels.shape)
        input_gradient = np.zeros(self.input.shape)

        # calculate gradients

        for i in range(self.channel):
            for j in range(self.input_channel):
                kernels_gradient[i, j] = \
                    signal.correlate2d(self.input[j],
                                       output_gradient[i], 'valid')
                input_gradient[j] += \
                    signal.convolve2d(output_gradient[i],
                                      self.kernels[i, j], 'full')

        # update weights by Adam

        self.kernels, self.bias, self.m, self.v = \
            self.optim.update_weights(
                self.kernels,
                self.bias,
                kernels_gradient,
                output_gradient,
                self.m,
                self.v,
                timestep
            )

        # get input gradient

        return input_gradient


class MaxPool:

    def __init__(self, size):
        self.pool_size = size
        self.stride = size
        self.is_trainable = False

    def get_pools(self):
        """helper for creating pools from input array"""

        # pools & max value positions per channel

        pools = [[] for _ in range(self.input.shape[0])]
        self.max_positions = [[] for _ in range(self.input.shape[0])]

        for channel in range(self.input.shape[0]):

            # Iterate over all row blocks (single block has `stride` rows)

            for i in np.arange(self.input.shape[1], step=self.stride):

                # Iterate over all column blocks (single block has `stride` columns)

                for j in np.arange(self.input.shape[2],
                                   step=self.stride):

                    # Extract the current pool

                    mat = self.input[channel, i:i + self.pool_size, j:j
                                                                      + self.pool_size]

                    # Make sure it's rectangular - has the shape identical to the pool size

                    if mat.shape == (self.pool_size, self.pool_size):
                        # Append to the list of pools

                        pools[channel].append(mat)

                        # append the maximum position for max value

                        max_idx = np.unravel_index(np.argmax(mat,
                                                             axis=None), mat.shape)

                        self.max_positions[channel].append((i
                                                            + max_idx[0], j + max_idx[1]))

        return np.array(pools)

    def forward(self, input):
        self.input = input

        # get pools from input

        pools = self.get_pools()

        # pooling

        pooled = [[] for _ in range(self.input.shape[0])]
        output_size = calculate_output_size(self.input.shape[1],
                                            self.pool_size, self.pool_size)

        for i in range(pools.shape[0]):
            for j in range(pools.shape[1]):
                pooled[i].append(np.max(pools[i, j]))

        # reshaping

        pooled = np.expand_dims(np.array(pooled), -1)
        return pooled.reshape(self.input.shape[0], output_size,
                              output_size)

    def backward(
            self,
            output_gradient,
            timestep
    ):

        # masking based on max value position of layer input
        new_output_gradient = np.zeros(self.input.shape)

        # mapping from smaller input (output_gradinet) to larger output (input_gradient)
        for channel in range(new_output_gradient.shape[0]):
            channel_gradients = output_gradient[channel].ravel()

            for idx, (max_row, max_col) in enumerate(self.max_positions[channel]):
                new_output_gradient[channel, max_row, max_col] = channel_gradients[idx]

        return new_output_gradient


def save_weights(network, path):
    weights = {}

    for layer in network:
        if layer.is_trainable:
            if hasattr(layer, 'kernels'):
                weights[layer.name] = {"kernels": layer.kernels, "bias": layer.bias}
            else:
                weights[layer.name] = {"weights": layer.weights, "bias": layer.bias}

    joblib.dump(weights, path)


def evaluate(network, x_test, y_test):
    acc = 0
    loss = 0

    for i in tqdm(range(x_test.shape[0])):
        x = x_test[i]
        y = y_test[i].reshape(-1, 1)

        for layer in network:
            # forward prop

            x = layer.forward(x)

        loss += multiclass_cross_entropy(y, x)

        if np.argmax(x) == np.argmax(y):
            acc += 1

    print('Test Loss = {:.4f}, Test Accuracy = {:.4f}'.format(loss / len(x_test),
                                                              acc / len(x_test)))


# necessary hyperparameters
seed = 0
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
(x_train, y_train), (x_test, y_test) = mnist.load_data()

randomize = np.arange(len(x_train))
np.random.shuffle(randomize)
x_train = x_train[randomize]
y_train = y_train[randomize]

# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255

x_train = np.expand_dims(x_train, 1)
x_test = np.expand_dims(x_test, 1)
num_classes = 10

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

N_EPOCHS = 3
BATCH_SIZE = 1
LEARNING_RATE = 0.0001
INPUT_SIZE = (1, 28, 28)
N_CLASS = 10

# model architecture

network = [
    Convolution(4, INPUT_SIZE, 5, 1, name="conv_1"),
    ReLU(),
    MaxPool(2),
    Convolution(8, (4, 12, 12), 5, 1, name="conv_2"),
    ReLU(),
    MaxPool(2),
    Reshaper((8, 4, 4), (128, 1)),
    Dense(128, 128, name="fc_1"),
    ReLU(),
    Dense(128, N_CLASS, name="fc_2"),
    Softmax(),
]

for i, layer in enumerate(network):
    if layer.is_trainable:
        layer.optim.learning_rate = LEARNING_RATE

for e in tqdm(range(N_EPOCHS), desc="Training the model for {} epochs".format(N_EPOCHS)):
    loss = 0
    acc = 0

    for i in range(x_train.shape[0]):
        x = x_train[i]
        y = y_train[i].reshape(-1, 1)

        for layer in network:
            # forward prop

            x = layer.forward(x)

        # loss calculation
        loss += multiclass_cross_entropy(y, x)

        if np.argmax(x) == np.argmax(y):
            acc += 1

        # backward prop

        grad = multiclass_cross_entropy_prime(y, x)

        for layer in reversed(network):
            grad = layer.backward(grad, i)

    print('Epoch: {}, Avg. Train Loss = {:.4f}, Avg. Train Accuracy = {:.4f}'.format(e + 1,
                                                                                     loss / len(x_train),
                                                                                     acc / len(x_train)))

# evaluate model on test data
evaluate(network, x_test, y_test)

# save weights
save_weights(network, "trained_network_weights.joblib")

Training the model for 3 epochs:   0%|          | 0/3 [00:00<?, ?it/s]

Epoch: 1, Avg. Train Loss = 0.6084, Avg. Train Accuracy = 0.8295
Epoch: 2, Avg. Train Loss = 0.4020, Avg. Train Accuracy = 0.8923
Epoch: 3, Avg. Train Loss = 0.3829, Avg. Train Accuracy = 0.9048


  0%|          | 0/10000 [00:00<?, ?it/s]

Test Loss = 0.3659, Test Accuracy = 0.9147


# Train & evaluate the model

In [None]:
from tensorflow.keras import layers

# Model / data parameters
num_classes = 10
input_shape = (28, 28, 1)

# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

randomize = np.arange(len(x_train))
np.random.shuffle(randomize)
x_train = x_train[randomize]
y_train = y_train[randomize]

# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

# Make sure images have shape (28, 28, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)


model = keras.Sequential(
    [
        keras.Input(shape=input_shape),
        layers.Conv2D(4, kernel_size=(5, 5), activation="relu", kernel_initializer="he_normal"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(8, kernel_size=(5, 5), activation="relu", kernel_initializer="he_normal"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dense(128, activation="relu", kernel_initializer="he_normal"),
        layers.Dense(num_classes, activation="softmax", kernel_initializer="he_normal"),
    ]
)

#model.summary()
batch_size = 1
epochs = 3
opt = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0)

score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

Epoch 1/3
Epoch 2/3
Epoch 3/3
Test loss: 0.07029560208320618
Test accuracy: 0.9757999777793884
