# Importing

In [None]:
import numpy as np
from keras.datasets import cifar10
from keras.utils import np_utils
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score, hamming_loss

e = []

# Base Class

In [None]:
class Layer:
    
    def __init__(self):
        self.input = None
        self.output = None

    #output Y of a layer for input X
    def fwd_prog(self, input):
        raise NotImplementedError

    # dE/dX for dE/dY (and update parameters)
    def back_prog(self, output_error, learning_rate):
        raise NotImplementedError

# Fully Connected Layer

In [None]:
class FCLayer(Layer):
    # input_size = number of input neurons
    # output_size = number of output neurons
    def __init__(self, input_size, output_size):
        self.weights = np.random.rand(input_size, output_size) - 0.5
        self.bias = np.random.rand(1, output_size) - 0.5

    # returns output for a given input
    def fwd_prog(self, input_data):
        self.input = input_data
        self.output = np.dot(self.input, self.weights) + self.bias
        return self.output

    # computes dE/dW, dE/dB for a given output_error=dE/dY. Returns input_error=dE/dX.
    def back_prog(self, output_error, learning_rate):
        input_error = np.dot(output_error, self.weights.T)
        weights_error = np.dot(self.input.T, output_error)

        # update parameters
        self.weights -= learning_rate * weights_error
        self.bias -= learning_rate * output_error
        return input_error

# Activation Layer

In [None]:
# inherit from base class Layer
class ActivationLayer(Layer):
    def __init__(self, activation, activation_prime):
        self.activation = activation
        self.activation_prime = activation_prime

    # returns the activated input
    def fwd_prog(self, input_data):
        self.input = input_data
        self.output = self.activation(self.input)
        return self.output

    # Returns input_error=dE/dX for a given output_error=dE/dY.
    # learning_rate is not used because there is no "learnable" parameters.
    def back_prog(self, output_error, learning_rate):
        return self.activation_prime(self.input) * output_error

# Activation Function And Its Derivative

In [None]:
def tanh(x):
    return np.tanh(x);

def dtanh(x):
    return 1-np.tanh(x)**2;

# Loss Functions

In [None]:
def ms_error(y_true, y_pred):
    return np.mean(np.power(y_true-y_pred, 2));

def dms_error(y_true, y_pred):
    return 2*(y_pred-y_true)/y_true.size;

# Network Class

In [None]:
class Network:
    def __init__(self):
        self.layers = []
        self.loss = None
        self.loss_prime = None

    # add layer to network
    def add(self, layer):
        self.layers.append(layer)

    # set loss to use
    def use(self, loss, loss_prime):
        self.loss = loss
        self.loss_prime = loss_prime

    # predict output for given input
    def predict(self, input_data):
        # sample dimension first
        samples = len(input_data)
        result = []

        # run network over all samples
        for i in range(samples):
            # forward propagation
            output = input_data[i]
            for layer in self.layers:
                output = layer.fwd_prog(output)
            result.append(output)

        return result

    # train the network
    def fit(self, x_train, y_train, epochs, learning_rate):
        # sample dimension first
        samples = len(x_train)

        # training loop
        for i in range(epochs):
            err = 0
            for j in range(samples):
                # forward propagation
                output = x_train[j]
                for layer in self.layers:
                    output = layer.fwd_prog(output)

                # compute loss (for display purpose only)
                err += self.loss(y_train[j], output)

                # backward propagation
                error = self.loss_prime(y_train[j], output)
                for layer in reversed(self.layers):
                    error = layer.back_prog(error, learning_rate)

            # calculate average error on all samples
            err /= samples
            print('epoch %d/%d   error=%f' % (i+1, epochs, err))
            e.append(err)

# Solve CIFAR-10

In [None]:
# load CIFAR10 from server
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# training data : 60000 samples
# reshape and normalize input data
x_train = x_train.reshape(x_train.shape[0], 1, 3072)
x_train = x_train.astype('float32')
x_train /= 255
# encode output which is a number in range [0,9] into a vector of size 10
# e.g. number 3 will become [0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
y_train = np_utils.to_categorical(y_train)

# same for test data : 10000 samples
x_test = x_test.reshape(x_test.shape[0], 1, 3072)
x_test = x_test.astype('float32')
x_test /= 255
y_test = np_utils.to_categorical(y_test)

# Network
net = Network()
net.add(FCLayer(3072, 100))                # input_shape=(1, 28*28)    ;   output_shape=(1, 100)
net.add(ActivationLayer(tanh, dtanh))
net.add(FCLayer(100, 50))                   # input_shape=(1, 100)      ;   output_shape=(1, 50)
net.add(ActivationLayer(tanh, dtanh))
net.add(FCLayer(50, 10))                    # input_shape=(1, 50)       ;   output_shape=(1, 10)
net.add(ActivationLayer(tanh, dtanh))

# train on 10000 samples
# as we didn't implemented mini-batch GD, training will be pretty slow if we update at each iteration on 60000 samples...
net.use(ms_error, dms_error)
net.fit(x_train[0:10000], y_train[0:10000], epochs=500, learning_rate=0.1)

# Testing The Model

In [None]:
import matplotlib.pyplot as plt
plt.plot(range(0, 500), e)
e = []

a = []
b = []
out = net.predict(x_test[0:1000])

a = np.argmax(y_test[0: 1000], axis = 1)
for x in range(1000) : 
  b.append(np.argmax(out[x], axis = 1)[0])

cf_matrix = confusion_matrix(a, b)
sns.heatmap(cf_matrix, annot=True)

# Accuracy Score
acc = accuracy_score(a, b, normalize=True, sample_weight=None)
print(acc)
# Hamming Loss
hl = hamming_loss(a, b)
print(hl)