<a href="https://colab.research.google.com/github/samibahig/CNN_Sami/blob/master/Untitled7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
"""
Created on Wed Feb 12 18:28:35 2020

@author: samib
"""

import pickle
import numpy as np
import gzip
import tqdm
np.set_printoptions(precision=3)

frame_log = tqdm.tqdm(total=0, position=4, bar_format='{desc}')

def one_hot(y, n_classes=10):
    return np.eye(n_classes)[y]


def load_mnist():
    data_file = gzip.open("mnist.pkl.gz", "rb")
    train_data, val_data, test_data = pickle.load(data_file, encoding="latin1")
    data_file.close()

    train_inputs = [np.reshape(x, (784, 1)) for x in train_data[0]]
    train_results = [one_hot(y, 10) for y in train_data[1]]
    train_data = np.array(train_inputs).reshape(-1, 784), np.array(train_results).reshape(-1, 10)

    val_inputs = [np.reshape(x, (784, 1)) for x in val_data[0]]
    val_results = [one_hot(y, 10) for y in val_data[1]]
    val_data = np.array(val_inputs).reshape(-1, 784), np.array(val_results).reshape(-1, 10)

    test_inputs = [np.reshape(x, (784, 1)) for x in test_data[0]]
    test_data = list(zip(test_inputs, test_data[1]))

    return train_data, val_data, test_data


#train_data_, val_data_, test_data_ = load_mnist()

class NN(object):
    def __init__(self,
                 hidden_dims=(784, 256),
                 epsilon=1e-6,
                 lr=7e-4,
                 batch_size=64,
                 seed=None,
                 activation="relu",
                 data=None
                 ):

        self.hidden_dims = hidden_dims
        self.n_hidden = len(hidden_dims)
        self.lr = lr
        self.batch_size = batch_size
        self.init_method = 'Glorot'
        self.seed = seed
        self.activation_str = activation
        self.epsilon = epsilon

        self.train_logs = {'train_accuracy': [], 'validation_accuracy': [], 'train_loss': [], 'validation_loss': []}

        if data is None:
            # for testing, do NOT remove or modify
            self.train, self.valid, self.test = (
                (np.random.rand(400, 784), one_hot(np.random.randint(0, 10, 400))),
                (np.random.rand(400, 784), one_hot(np.random.randint(0, 10, 400))),
                (np.random.rand(400, 784), one_hot(np.random.randint(0, 10, 400)))
            )
        else:
            self.train, self.valid, self.test = data

    def initialize_weights(self, dims):
        if self.seed is not None:
            np.random.seed(self.seed)

        self.weights = {}
        # self.weights is a dictionnary with keys W1, b1, W2, b2, ..., Wm, Bm where m - 1 is the number of hidden layers
        all_dims = [dims[0]] + list(self.hidden_dims) + [dims[1]]
        for layer_n in range(1, self.n_hidden + 2):
            if(self.init_method == "Glorot"):
                d = np.sqrt(1 / all_dims[layer_n - 1])
                W = np.random.uniform(-d, d, (all_dims[layer_n - 1], all_dims[layer_n]))
                self.weights[f"W{layer_n}"] = W
                
            elif(self.init_method == "Normal"):
                self.weights[f"W{layer_n}"] = np.random.normal(0,1,size = (all_dims[layer_n - 1], all_dims[layer_n]))
            
                    
    def relu(self, x, grad=False):
        if grad:
            return x > 0
            pass
        return np.maximum(x, 0, x)
        pass
        return 0

    def sigmoid(self, x, grad=False):
        if grad:
            s = 1 / (1 + np.exp(-x))
            return s * (s - 1)
        return 1 / (1 + np.exp(-x))
        return 0

    def tanh(self, x, grad=False):
        if grad:
            return 1.0 - np.tanh(x) ** 2
        return np.tanh(x)
        pass
        return 0

    def activation(self, x, grad=False):
        rst = 0
        if self.activation_str == "relu":
            rst=self.relu(x, grad)
        elif self.activation_str == "sigmoid":
            rst=self.sigmoid(x, grad)
        elif self.activation_str == "tanh":
            rst=self.tanh(x, grad)
        else:
            raise Exception("invalid")
        return rst

    def softmax(self, x):
        # Remember that softmax(x-C) = softmax(x) when C is a constant.
        exps = np.exp(x)
        sum_exp= np.sum(exps, axis=0)
        return exps / sum_exp

    def forward(self, x):
        cache = {"Z0": x}
        # cache is a dictionnary with keys Z0, A0, ..., Zm, Am where m - 1 is the number of hidden layers
        # Ai corresponds to the preactivation at layer i, Zi corresponds to the activation at layer i
        # WRITE CODE HERE
        Z = x.T
        cache[f"Z{str(0)}"] = Z
        for layer_n in range(1, self.n_hidden + 1):
            W = self.weights["W" + str(layer_n)]
            b = self.weights["b" + str(layer_n)]
            # print('Shape W' + str(np.transpose(W).shape))
            # print('Shape b' + str(np.transpose(b).shape))

            A = np.dot(np.transpose(W), Z) + np.transpose(b)
            Z = self.activation(A)
            cache[f"A{str(layer_n)}"] = A
            cache[f"Z{str(layer_n)}"] = Z

        #Apply softmax
        W = self.weights["W" + str(self.n_hidden + 1)]
        b = self.weights["b" + str(self.n_hidden + 1)]
        # print('Shape W' + str(np.transpose(W).shape))
        # print('Shape b' + str(np.transpose(b).shape))

        A = np.dot(np.transpose(W), Z) + np.transpose(b)
        Z = self.softmax(A)
        cache[f"A{str(self.n_hidden + 1)}"] = A
        cache[f"Z{str(self.n_hidden + 1)}"] = Z

        return cache

    def backward(self, cache, labels):
        # print("cache " + str(cache.keys()))
        output = cache[f"Z{self.n_hidden + 1}"]
        n = len(labels)
        grads = {}
        dA = output - labels.T
        prev_activ = cache[f"Z" + str(self.n_hidden)]
        dW = (1. / n) * np.dot(dA, prev_activ.T)
        db = (1. / n) * np.sum(dA, axis=1, keepdims=True)

        dZ = np.dot(self.weights["W" + str(self.n_hidden + 1)], dA)
        grads["dW" + str(self.n_hidden + 1)] = dW
        grads["db" + str(self.n_hidden + 1)] = db

        for layer_n in range(self.n_hidden, 0, -1):
            grad_prev_activ = self.activation(cache["A" + str(layer_n)],True)
            dA = dZ * grad_prev_activ
            prev_activ_v = cache["Z" + str(layer_n - 1)]
            dW = 1./n * np.dot(dA, prev_activ_v.T)
            db = 1./n * np.sum(dA, axis=1, keepdims=True)
            if layer_n >= 1:
                W_prev_layer = self.weights["W" + str(layer_n)]
                dZ = np.dot(W_prev_layer, dA)
            grads["dW" + str(layer_n)] = dW
            grads["db" + str(layer_n)] = db
        return grads

    def update(self, grads):
        for layer in range(1, self.n_hidden + 2):
            # WRITE CODE HERE
            W = self.weights["W" + str(layer)]
            b = self.weights["b" + str(layer)]

            W = W - self.lr * grads["dW" + str(layer)].T
            b = b - self.lr * grads["db" + str(layer)].T

            self.weights.update({"W" + str(layer): W, "b" + str(layer): b})

    # def one_hot(self, y, n_classes=None):
    #     n_classes = n_classes or self.n_classes
    #     return np.eye(n_classes)[y]

    def loss(self, prediction, labels):
        prediction = np.multiply(prediction, labels)
        precision = np.max(prediction, axis=1)
        log_precision = np.log(precision, out=np.zeros_like(precision), where=(precision != 0))
        log_err = np.multiply(log_precision, -1)
        err = np.mean(log_err)
        return err

    def compute_loss_and_accuracy(self, X, y):
        one_y = y
        y = np.argmax(y, axis=1)  # Change y to integers
        cache = self.forward(X)
        predictions = np.argmax(cache[f"Z{self.n_hidden + 1}"], axis=0)
        accuracy = np.mean(y == predictions)
        loss = self.loss(cache[f"Z{self.n_hidden + 1}"], one_y.T)
        return loss, accuracy, predictions

    def train_loop(self, n_epochs):
        X_train, y_train = self.train
        y_onehot = y_train
        dims = [X_train.shape[1], y_onehot.shape[1]]
        self.initialize_weights(dims)

        n_batches = int(np.ceil(X_train.shape[0] / self.batch_size))

        for epoch in range(n_epochs):
            for batch in range(n_batches):
                minibatchX = X_train[self.batch_size * batch:self.batch_size * (batch + 1), :]
                minibatchY = y_onehot[self.batch_size * batch:self.batch_size * (batch + 1), :]
                # WRITE CODE HERE
                # Forward
                cache = self.forward(minibatchX)
                # Backward
                grads = self.backward(cache, minibatchY)
                # Update
                self.update(grads)

            X_train, y_train = self.train
            train_loss, train_accuracy, _ = self.compute_loss_and_accuracy(X_train, y_train)
            X_valid, y_valid = self.valid
            valid_loss, valid_accuracy, _ = self.compute_loss_and_accuracy(X_valid, y_valid)

            self.train_logs['train_accuracy'].append(train_accuracy)
            self.train_logs['validation_accuracy'].append(valid_accuracy)
            self.train_logs['train_loss'].append(train_loss)
            self.train_logs['validation_loss'].append(valid_loss)

        print("The avg training loss for the {} method as init_method is {}".format(self.init_method,np.mean( self.train_logs['train_accuracy'])))
        return self.train_logs

    def evaluate(self):
        X_test, y_test = self.test
        test_loss, test_accuracy, _ = self.compute_loss_and_accuracy(X_test, y_test)
        return test_loss, test_accuracy


class NeuralMLP(NN):
    def __init__(self,
                 hidden_dims=(784, 256),
                 epsilon=1e-6,
                 lr=7e-4,
                 batch_size=64,
                 seed=None,
                 activation="relu",
                 data=None,
                 init_method="zero"
                 ):
        NN.__init__(self,hidden_dims,epsilon,lr,batch_size,seed,activation,data)
        self.init_method = init_method


    def initialize_weights(self, dims):
        if self.seed is not None:
            np.random.seed(self.seed)

        self.weights = {}
        nb_param = 0
        # self.weights is a dictionnary with keys W1, b1, W2, b2, ..., Wm, Bm where m - 1 is the number of hidden layers
        all_dims = [dims[0]] + list(self.hidden_dims) + [dims[1]]

        if self.init_method == "glorot":
            for layer_n in range(1, self.n_hidden + 2):
                d = np.sqrt(6 / all_dims[layer_n - 1])
                W = np.random.uniform(-d, d, (all_dims[layer_n - 1], all_dims[layer_n]))
                b = np.zeros((1, all_dims[layer_n]))
                self.weights[f"W{layer_n}"] = W
                self.weights[f"b{layer_n}"] = b
                nb_param = nb_param + W.size + b.size

        if self.init_method == 'normal':
            for layer_n in range(1, self.n_hidden + 2):
                W = np.random.normal(loc = 0.0, scale = 1.0,size=(all_dims[layer_n - 1], all_dims[layer_n]))
                b = np.zeros((1, all_dims[layer_n]))
                self.weights[f"W{layer_n}"] = W
                self.weights[f"b{layer_n}"] = b
                nb_param = nb_param + W.size + b.size


        if self.init_method == 'zero':
            for layer_n in range(1, self.n_hidden + 2):
                W = np.zeros((all_dims[layer_n - 1], all_dims[layer_n]))
                b = np.zeros((1, all_dims[layer_n]))
                self.weights[f"W{layer_n}"] = W
                self.weights[f"b{layer_n}"] = b
                nb_param = nb_param + W.size + b.size

        print("The number of parameter is {}".format(nb_param))




data = load_mnist()

nn = NeuralMLP(hidden_dims=(300,400), data=data, init_method="zero", activation="relu",lr=0.0007)
print(nn.train_loop(10))