<a href="https://colab.research.google.com/github/venomouscyanide/dl_sain/blob/master/week1/mlp_week1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# gdown included in the env is outdated. Install the latest release
!pip install gdown==3.13.0



### Load all essential libraries

In [None]:
import gzip
import shutil
from typing import Tuple, List
# third party
import gdown # helps download gdrive files
import numpy as np
import struct


### Initialize class for loading MNIST data. MNIST data being downloaded is a mirror of the [original MNIST dataset](http://yann.lecun.com/exdb/mnist/)

Responsibilities:

1.   Download the original compressed datasets
2.   Uncompress and write the uncompressed files
3.   Read the uncompressed idx files and convert them to an ndarray

---





In [None]:
class MNISTDataLoader:
    # TODO: Do all this in memory.
    TRAINING_DATA_URL: str = 'https://drive.google.com/uc?id=1pmI9wAdNtJkOvkJpdTqM9bmIAwPkGyMU'
    TRAINING_DATA_LABELS_URL: str = 'https://drive.google.com/uc?id=1R8BZL67U1N0GUGnf6AQIBZNVDCWO9QLS'
    TESTING_DATA_URL: str = 'https://drive.google.com/uc?id=10FdcUHw3BcQAU6keKaUwtDwJm4sC00Hu'
    TESTING_DATA_LABELS_URL: str = 'https://drive.google.com/uc?id=1GvsacEnI1eQ1vYZM-oYdERvaE2SPh0Lj'

    def load_data_wrapper(self):
        testing_data_tuple = self.load_data_as_ndarray(self.TESTING_DATA_URL, self.TESTING_DATA_LABELS_URL, False)
        training_data_tuple = self.load_data_as_ndarray(self.TRAINING_DATA_URL, self.TRAINING_DATA_LABELS_URL, True)
        return training_data_tuple, testing_data_tuple

    def load_data_as_ndarray(self, data_file_url: str, data_labels_file_url: str, train: bool) -> List[
        Tuple[np.ndarray, int]]:
        uncompressed_dataset = self._download_and_uncompressed_file(data_file_url)
        uncompressed_labels = self._download_and_uncompressed_file(data_labels_file_url)
        pixel_data = self._get_pixel_data(uncompressed_dataset)
        label_data = self._get_labels(uncompressed_labels)
        zipped_data = [
            (x.reshape(784, 1), self._one_hot_enc(y) if train else y[0]) for x, y in zip(pixel_data, label_data)
        ]
        return zipped_data

    def _one_hot_enc(self, y: np.ndarray):
        one_hot_vector = np.zeros((10, 1))
        one_hot_vector[y[0]][0] = 1
        return one_hot_vector

    def _download_and_uncompressed_file(self, url: str) -> str:
        downloaded_gzip = gdown.download(url, quiet=True)
        decompressed_data_file = self._write_decompressed_data(downloaded_gzip)
        return decompressed_data_file

    def _write_decompressed_data(self, downloaded_gzip: str) -> str:
        with gzip.open(downloaded_gzip, 'rb') as compressed:
            uncompressed_dataset = downloaded_gzip.replace('.gz', '')
            with open(uncompressed_dataset, 'wb') as decompressed:
                shutil.copyfileobj(compressed, decompressed)
        return uncompressed_dataset

    def _get_pixel_data(self, data_file: str) -> np.ndarray:
        with open(data_file, "rb") as dataset:
            _, num_data = struct.unpack(">II", dataset.read(8))
            num_rows, num_colums = struct.unpack(">II", dataset.read(8))
            pixel_data = np.fromfile(dataset, dtype=np.uint8) / 255
            pixel_data = pixel_data.reshape((num_data, num_rows * num_colums))
        return pixel_data

    def _get_labels(self, data_labels_file: str) -> np.ndarray:
        with open(data_labels_file, "rb") as labels:
            _, num_data = struct.unpack(">II", labels.read(8))
            label_data = np.fromfile(labels, dtype=np.uint8)
            label_data = label_data.reshape((num_data, -1))
        return label_data

### Utility methods needed for building the neural net


In [None]:
class NetworkUtils:
    @staticmethod
    def sigmoid(z: np.ndarray) -> np.ndarray:
        return 1 / (1 + np.exp(-z))

    @staticmethod
    def sigmoid_prime(z: np.ndarray) -> np.ndarray:
        return NetworkUtils.sigmoid(z) * (1 - NetworkUtils.sigmoid(z))

### The neural network itself



In [None]:
class Network:
    def __init__(self, training_data: List[Tuple[np.ndarray, np.ndarray]],
                 testing_data: List[Tuple[np.ndarray, int]],
                 size: List[int], learning_rate: float, epochs: int,
                 mini_batch_size: int):
        self.training_data = training_data
        self.testing_data = testing_data
        self.size = size
        self.num_layers = len(size)
        self.learning_rate = learning_rate
        self.biases = []
        self.weights = []
        self._init_biases()
        self._init_weights()
        self.epochs = epochs
        self.mini_batch_size = mini_batch_size

    def _init_biases(self):
        for i in range(1, self.num_layers):
            self.biases.append(np.random.randn(self.size[i], 1))

    def _init_weights(self):
        bias_matrix_sizes = [(self.size[x + 1], self.size[x]) for x in range(self.num_layers - 1)]
        for x, y in bias_matrix_sizes:
            self.weights.append(np.random.randn(x, y))

    def train(self):
        for epoch in range(1, self.epochs + 1):
            np.random.shuffle(self.training_data)
          
            mini_batches = self._create_mini_batches()
            for mini_batch in mini_batches:
                self._update_b_w(mini_batch)
            
            if epoch % 10 == 0:
              print(f"Finish training for epoch: {epoch} of {self.epochs}")
              self._calc_accuracy()

    def _create_mini_batches(self) -> List[List[Tuple[np.ndarray, np.ndarray]]]:
        mini_batches = [
            self.training_data[multiple:multiple + self.mini_batch_size] for multiple in
            range(0, len(self.training_data), self.mini_batch_size)
        ]
        return mini_batches

    def _update_b_w(self, mini_batch: List[Tuple[np.ndarray, np.ndarray]]):
        nabla_bias = self._get_nabla_bias_zeroes()
        nabla_wt = self._get_nabla_wt_zeroes()

        for x, y in mini_batch:
            del_bias, del_wt = self._run_back_propagation(x, y)

            nabla_bias = [curr_b + del_b for curr_b, del_b in zip(nabla_bias, del_bias)]
            nabla_wt = [curr_wt + del_w for curr_wt, del_w in zip(nabla_wt, del_wt)]

        self.biases = [
            b - ((self.learning_rate / self.mini_batch_size) * nb) for b, nb in zip(self.biases, nabla_bias)
        ]
        self.weights = [
            w - ((self.learning_rate / self.mini_batch_size) * nw) for w, nw in zip(self.weights, nabla_wt)
        ]

    def _get_nabla_bias_zeroes(self) -> List[np.ndarray]:
        return [np.zeros(np.shape(bias)) for bias in self.biases]

    def _get_nabla_wt_zeroes(self) -> List[np.ndarray]:
        return [np.zeros(np.shape(wt)) for wt in self.weights]

    def _run_back_propagation(self, x: np.ndarray, y: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray]]:
        nabla_bias = self._get_nabla_bias_zeroes()
        nabla_wt = self._get_nabla_wt_zeroes()

        activations = []
        z_list = []

        a = x
        activations.append(a)

        for i in range(self.num_layers - 1):
            z = np.dot(self.weights[i], a) + self.biases[i]
            z_list.append(z)

            a = NetworkUtils.sigmoid(z)
            activations.append(a)

        error_l = np.multiply(self._nabla_a(activations[-1], y), NetworkUtils.sigmoid_prime(z_list[-1]))
        nabla_bias[-1] = error_l
        nabla_wt[-1] = np.dot(error_l, np.transpose(activations[-2]))

        for layer in range(self.num_layers - 2, 0, -1):
            error_l = np.multiply(
                np.dot(np.transpose(self.weights[layer]), error_l), NetworkUtils.sigmoid_prime(z_list[layer - 1])
            )

            nabla_bias[layer - 1] = error_l
            nabla_wt[layer - 1] = np.dot(error_l, activations[layer - 1].transpose())

        return nabla_bias, nabla_wt

    def _nabla_a(self, a_l: np.ndarray, y: np.ndarray) -> np.ndarray:
        return a_l - y

    def _calc_accuracy(self):
        correct_results = 0
        total_results = len(self.testing_data)
        for x, y in self.testing_data:
            logit = self.feedforward(x)
            if np.argmax(logit) == y:
                correct_results += 1
        print(f"Total accuracy on testing data: {round((correct_results / total_results) * 100, 2)}%")

    def feedforward(self, x: np.ndarray) -> np.ndarray:
        a = x
        for layer in range(self.num_layers - 1):
            a = NetworkUtils.sigmoid(np.dot(self.weights[layer], a) + self.biases[layer])
        return a

### Initialize all the hyperparameters 

In [None]:
class Hyperparameters:
    SIZE: List[int] = [28 * 28, 30, 10]
    LEARNING_RATE: float = 5
    EPOCHS: int = 50
    MINI_BATCH_SIZE: int = 10

### Driver method for training and evaluating the performance on each epoch

In [None]:
def train_and_eval():
    print("Loading data")
    training, testing = MNISTDataLoader().load_data_wrapper()
    params = Hyperparameters
    print("Training NN and testing accuracy")
    mlp = Network(training, testing, params.SIZE, params.LEARNING_RATE, params.EPOCHS, params.MINI_BATCH_SIZE)
    mlp.train()

### Call the driver method

In [None]:
train_and_eval()


Loading data
Training NN and testing accuracy
Finish training for epoch: 10 of 50
Total accuracy on testing data: 94.38%
Finish training for epoch: 20 of 50
Total accuracy on testing data: 94.85%
Finish training for epoch: 30 of 50
Total accuracy on testing data: 95.23%
Finish training for epoch: 40 of 50
Total accuracy on testing data: 95.38%
Finish training for epoch: 50 of 50
Total accuracy on testing data: 95.24%
