# Lab 12 (Solution)
## Convolutional Neural Network

Your task is to implement the convolutional neural network below according to the architecture described in the PDF. Feel free to define the model as it suits you. Use the following TensorFlow tutorials as a source of inspiration:

- https://www.tensorflow.org/guide/keras/sequential_model
- https://www.tensorflow.org/guide/keras/functional
- https://www.tensorflow.org/guide/keras/custom_layers_and_models

If you correctly implement the CNN you should be able to achieve an accuracy of approximately 99.5% on the test dataset. Hint: Keep in mind that the dropout behaves differently during training than at test time.

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
import os

import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import (
    Dense,
    Softmax,
    Conv2D,
    MaxPool2D,
    ReLU,
    Flatten,
    Dropout,
)

## Data Loading
The following class handles the data loading / preprocessing part.

In [None]:
class DataLoaderMNIST:
    """
    Prepare TensorFlow iterator of MNIST dataset and split data into train,
    valid, and test subsets.

    """

    def __init__(self, flatten=True):
        VALIDATION_DATASET_SIZE = 5000
        MINI_BATCH_SIZE = 32

        # Load MNIST data
        mnist = tf.keras.datasets.mnist
        (x_train, y_train), (x_test, y_test) = mnist.load_data()

        # Scale images
        x_train = x_train / 255.0
        x_test = x_test / 255.0

        # Flatten images
        if flatten:
            x_train = x_train.reshape([len(x_train), -1])
            x_test = x_test.reshape([len(x_test), -1])
        else:
            x_train = x_train.reshape(x_train.shape + (1,))
            x_test = x_test.reshape(x_test.shape + (1,))

        # Cast dtype
        x_train = x_train.astype("float32")
        x_test = x_test.astype("float32")

        # Split off validation dataset from training dataset
        indices = np.random.choice(len(y_train), VALIDATION_DATASET_SIZE, replace=False)
        x_valid = x_train[indices, :]
        y_valid = y_train[indices]
        x_train = np.delete(x_train, indices, axis=0)
        y_train = np.delete(y_train, indices, axis=0)

        # Convert labels to one-hot tensor
        y_train = tf.one_hot(y_train, 10)
        y_test = tf.one_hot(y_test, 10)
        y_valid = tf.one_hot(y_valid, 10)

        # Create datasets
        self._train_dataset = (
            tf.data.Dataset.from_tensor_slices((x_train, y_train))
            .shuffle(len(y_train))
            .batch(MINI_BATCH_SIZE)
        )
        self._valid_dataset = tf.data.Dataset.from_tensor_slices(
            (x_valid, y_valid)
        ).batch(MINI_BATCH_SIZE)
        self._test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(
            MINI_BATCH_SIZE
        )

    @property
    def train_dataset(self):
        return self._train_dataset

    @property
    def valid_dataset(self):
        return self._valid_dataset

    @property
    def test_dataset(self):
        return self._test_dataset

## Model
This class defines the neural network as derived tf.keras.Model class.

In [None]:
class MyModel(Model):
    """
    TensorFlow convolutional neural network which is used as classifier for
    MNIST images.

    Parameters
    ----------
    name : string, optional
        Name of model. The default is None.
    **kwargs :
        See description of tf.keras.Model.

    """

    def __init__(self, name=None, **kwargs):
        super(MyModel, self).__init__(name=name, **kwargs)

        self.conv_1 = Conv2D(filters=32, kernel_size=5, padding="same")
        self.conv_2 = Conv2D(filters=64, kernel_size=5, padding="same")

        self.relu_1 = ReLU()
        self.relu_2 = ReLU()
        self.relu_3 = ReLU()

        self.pool_1 = MaxPool2D(pool_size=(2, 2))
        self.pool_2 = MaxPool2D(pool_size=(2, 2))

        self.flat_1 = Flatten()

        self.drop_1 = Dropout(0.5)
        self.drop_2 = Dropout(0.5)

        self.dense_1 = Dense(1024)
        self.dense_2 = Dense(10)

        self.softm_1 = Softmax()

    def call(self, x, training=False):
        """
        Forward pass of MyModel with specific input x.

        Parameters
        ----------
        x : Tensor float32 (None, 28, 28, 1)
            Input to MyModel.
        training : bool, optional
            training=True is only needed if there are layers with different
            behavior during training versus inference (e.g. Dropout).
            The default is False.

        Returns
        -------
        out : tensor float32 (None, 10)
            Output of MyModel.

        """

        # First convolutional stage
        t_conv_1 = self.conv_1(x)
        t_relu_1 = self.relu_1(t_conv_1)
        t_pool_1 = self.pool_1(t_relu_1)

        # Second convolutional stage
        t_conv_2 = self.conv_2(t_pool_1)
        t_relu_2 = self.relu_2(t_conv_2)
        t_pool_2 = self.pool_2(t_relu_2)

        # Flatten
        t_flat_1 = self.flat_1(t_pool_2)

        # First dense stage
        t_drop_1 = self.drop_1(t_flat_1, training)
        t_dens_1 = self.dense_1(t_drop_1)
        t_relu_3 = self.relu_3(t_dens_1)

        # Second dense stage
        t_drop_2 = self.drop_2(t_relu_3, training)
        t_dens_2 = self.dense_2(t_drop_2)
        out = self.softm_1(t_dens_2)

        return out

## Early Stopping Counter

In [None]:
class EarlyStoppingCounter:
    def __init__(self, patience=3, improvement_margin=0.0002):
        self._patience = patience
        self._improvement_margin = improvement_margin

        self.reset()

    def reset(self):
        self._best = 0.0
        self._count = 0

    def update(self, current):
        if current < self._best + self._improvement_margin:
            self._count += 1
        else:
            self._count = 0
            self._best = current

    def is_stopping_criteria_reached(self):
        return self._count > self._patience

## Training
This class executes the training, validation and testing process.

In [None]:
class Training:
    """
    Training class, for model, logging metrics in tensorboard_dir.

    Parameters
    ----------
    model : tf.keras.Model
        TensorFlow model to be trained.
    tensorboard_dir : string
        Path to tensorboard directory.

    """

    def __init__(self, model, tensorboard_dir):
        self._model = model
        self._optimizer = tf.keras.optimizers.Adam()
        self._loss = tf.keras.losses.CategoricalCrossentropy()
        self._init_metrics()
        self._init_tensorboard(tensorboard_dir)
        self._early_stopping_counter = EarlyStoppingCounter()

    def __call__(
        self,
        train_dataset,
        valid_dataset,
        test_dataset,
        max_epochs=50,
        evaluate_every=5,
    ):
        """
        This method implements the training loop using train_dataset and
        evaluetes the valid_dataset every evaluate_every epochs. At the end
        it tests the classifiers performance based on test_dataset.

        Parameters
        ----------
        train_dataset : TF Dataset
            Training dataset which consists of pairs of input and labels.
        valid_dataset : TF Dataset
            Validation dataset which consists of pairs of input and labels.
        test_dataset : TF Dataset
            Test dataset which consists of pairs of input and labels.
        max_epochs : int, optional
            Maximum number of times train_dataset is iterated over.
            The default is 50.
        evaluate_every : int, optional
            Every evaluate_every'th epoch the model is evaluated based on
            valid_dataset. The default is 5.

        Returns
        -------
        None.

        """
        for epoch in range(1, max_epochs + 1):
            self._reset_train_metrics()
            for images, labels in train_dataset:
                self._train_step(images, labels)

            if epoch % evaluate_every == 0:
                self._reset_valid_metrics()
                for images, labels in valid_dataset:
                    self._valid_step(images, labels)

                self._print_train_and_valid_results(epoch, max_epochs)
                self._update_tensorboard(epoch)

                self._early_stopping_counter.update(self._valid_accuracy.result())
                if self._early_stopping_counter.is_stopping_criteria_reached():
                    print("Early stopping at epoch {:2d}.".format(epoch))
                    self._early_stopping_counter.reset()
                    break

        self._reset_test_metrics()
        for images, labels in test_dataset:
            self._test_step(images, labels)

        self._print_test_results()

    @tf.function
    def _train_step(self, images, labels):
        with tf.GradientTape() as tape:
            predictions = self._model(images, training=True)
            loss = self._loss(labels, predictions)

        gradients = tape.gradient(loss, self._model.trainable_variables)
        self._optimizer.apply_gradients(zip(gradients, self._model.trainable_variables))
        self._update_train_metrics(loss, labels, predictions)

    @tf.function
    def _valid_step(self, images, labels):
        predictions = self._model(images, training=False)
        loss = self._loss(labels, predictions)
        self._update_valid_metrics(loss, labels, predictions)

    @tf.function
    def _test_step(self, images, labels):
        predictions = self._model(images, training=False)
        loss = self._loss(labels, predictions)
        self._update_test_metrics(loss, labels, predictions)

    def _init_metrics(self):
        self._init_train_metrics()
        self._init_valid_metrics()
        self._init_test_metrics()

    def _init_train_metrics(self):
        self._train_loss = tf.keras.metrics.Mean(name="train_loss")
        self._train_accuracy = tf.keras.metrics.CategoricalAccuracy(
            name="train_accuracy"
        )

    def _init_valid_metrics(self):
        self._valid_loss = tf.keras.metrics.Mean(name="valid_loss")
        self._valid_accuracy = tf.keras.metrics.CategoricalAccuracy(
            name="valid_accuracy"
        )

    def _init_test_metrics(self):
        self._test_loss = tf.keras.metrics.Mean(name="test_loss")
        self._test_accuracy = tf.keras.metrics.CategoricalAccuracy(name="test_accuracy")

    def _reset_train_metrics(self):
        self._train_loss.reset_state()
        self._train_accuracy.reset_state()

    def _reset_valid_metrics(self):
        self._valid_loss.reset_state()
        self._valid_accuracy.reset_state()

    def _reset_test_metrics(self):
        self._test_loss.reset_state()
        self._test_accuracy.reset_state()

    def _update_train_metrics(self, loss, labels, predictions):
        self._train_loss(loss)
        self._train_accuracy(labels, predictions)

    def _update_valid_metrics(self, loss, labels, predictions):
        self._valid_loss(loss)
        self._valid_accuracy(labels, predictions)

    def _update_test_metrics(self, loss, labels, predictions):
        self._test_loss(loss)
        self._test_accuracy(labels, predictions)

    def _print_train_and_valid_results(self, epoch, max_epochs):
        print(
            "Epoch {:3d} of {:3d}, ".format(epoch, max_epochs),
            "Train Loss: {:3.3f}, ".format(self._train_loss.result()),
            "Train Accuracy: {:3.3f}%, ".format(self._train_accuracy.result() * 100),
            "Valid Loss: {:3.3f}, ".format(self._valid_loss.result()),
            "Valid Accuracy: {:3.3f}%".format(self._valid_accuracy.result() * 100),
        )

    def _print_test_results(self):
        print(
            "\nTest Loss: {:3.3f}, ".format(self._test_loss.result()),
            "Test Accuracy: {:3.3f}%".format(self._test_accuracy.result() * 100),
        )

    def _init_tensorboard(self, tensorboard_dir):
        self._train_summary_writer = tf.summary.create_file_writer(
            os.path.join(tensorboard_dir, "train")
        )
        self._valid_summary_writer = tf.summary.create_file_writer(
            os.path.join(tensorboard_dir, "valid")
        )

    def _update_tensorboard(self, epoch):
        with self._train_summary_writer.as_default():
            tf.summary.scalar("Train loss", self._train_loss.result(), step=epoch)
            tf.summary.scalar(
                "Train accuracy", self._train_accuracy.result(), step=epoch
            )

        with self._valid_summary_writer.as_default():
            tf.summary.scalar("Valid loss", self._valid_loss.result(), step=epoch)
            tf.summary.scalar(
                "Valid accuracy", self._valid_accuracy.result(), step=epoch
            )

## Main
In the following cell all objects are created and the training process is launched.

In [None]:
%tensorboard --logdir ./logs

In [None]:
TENSORBOARD_PATH = "./logs"

data_loader = DataLoaderMNIST(flatten=False)
model = MyModel("MNISTClassifier")
train = Training(model, TENSORBOARD_PATH)

train_dataset = data_loader.train_dataset
valid_dataset = data_loader.valid_dataset
test_dataset = data_loader.test_dataset

train(train_dataset, valid_dataset, test_dataset)