# Data

In [None]:
import os
import pickle
import copy
import numpy as np
import yaml
from tqdm import tqdm


def one_hot_encoding(labels, num_classes=10):
    """
    Encode labels using one hot encoding and return them.
    """
    return np.eye(num_classes)[labels]


def onehot_decode(labels):
    """
    Performs one-hot decoding on labels.

    Ideas:
        NumPy's `argmax` function 

    Parameters
    ----------
    labels : np.array
        2d array (shape n*k) with each row corresponding to 
        a one-hot encoded version of the original value.

    Returns
    -------
        1d array (length n) of targets (k)
    """
    # return the onehot decoded vector by using given `argmax` hint
    return np.argmax(labels, axis=1)


def write_to_file(path, data):
    """
    Dumps pickled data into the specified relative path.

    Args:
        path: relative path to store to
        data: data to pickle and store
    """
    with open(path, 'wb') as handle:
        pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)


def load_data(train=True):
    """
    Load the data from disk

    Args:
        train: Load training data if true, else load test data

    Returns:
        Tuple:
            Images
            Labels
    """
    directory = 'train' if train else 'test'
    patterns = np.load(os.path.join('./data/', directory, 'images.npz'))['arr_0']
    labels = np.load(os.path.join('./data/', directory, 'labels.npz'))['arr_0']
    return patterns.reshape(len(patterns), -1), labels


def load_config(path):
    """
    Load the configuration from config.yaml

    Args:
        path: A relative path to the config.yaml file

    Returns:
        A dict object containing the parameters specified in the config file
    """
    return yaml.load(open(path, 'r'), Loader=yaml.SafeLoader)


def generate_k_fold_set(dataset, k=5):
    """
    Creates a generator object to generate k folds for k fold cross validation.

    Args:
        dataset: The dataset to create folds on
        k: The number of folds

    Returns:
        A train and validation fold for each call, up to k times
    """
    X, y = dataset
    if k == 1:
        yield (X, y), (X[len(X):], y[len(y):])
        return

    order = np.random.permutation(len(X))

    fold_width = len(X) // k

    l_idx, r_idx = 0, fold_width

    for i in range(k):
        train = np.concatenate([X[order[:l_idx]], X[order[r_idx:]]]), np.concatenate(
            [y[order[:l_idx]], y[order[r_idx:]]])
        validation = X[order[l_idx:r_idx]], y[order[l_idx:r_idx]]
        yield train, validation
        l_idx, r_idx = r_idx, r_idx + fold_width


def z_score_normalize(X, u=None, sd=None):
    """
    Performs z-score normalization on X.
    f(x) = (x - μ) / σ
        where
            μ = mean of x
            σ = standard deviation of x

    Args:
        X: the data to min-max normalize
        u: the mean to normalize X with
        sd: the standard deviation to normalize X with

    Returns:
        Tuple:
            Transformed dataset with mean 0 and stdev 1
            Computed statistics (mean and stdev) for the dataset to undo z-scoring.

    """
    if u is None:
        u = np.mean(X, axis=0)
    if sd is None:
        sd = np.std(X, axis=0)
    return ((X - u) / sd), (u, sd)


def shuffle(dataset):
    """
    Shuffle dataset.

    Make sure that corresponding images and labels are kept together. 
    Ideas: 
        NumPy array indexing 
            https://numpy.org/doc/stable/user/basics.indexing.html#advanced-indexing

    Parameters
    ----------
    dataset
        Tuple containing
            Images (X)
            Labels (y)

    Returns
    -------
        Tuple containing
            Images (X)
            Labels (y)
    """
    # find the number of images in the data and save their indexes as an array
    idx = np.arange(len(dataset[0]))
    # shuffle the above array in-place
    np.random.shuffle(idx)
    # return the shuffled dataset
    return dataset[0][idx], dataset[1][idx]


def generate_minibatches(dataset, batch_size=128):
    """
    Helper method to generate minibatches

    Parameters
    ----------
    dataset
        Tuple containing
            Images (X)
            Labels (y)
    batch_size
        int with default of 64
    Returns
    -------
        smaller Tuple:
            Images (X)
            Labels (y)
    """
    # given by the startercode without modification
    X, y = dataset
    l_idx, r_idx = 0, batch_size
    while r_idx < len(X):
        yield X[l_idx:r_idx], y[l_idx:r_idx]
        l_idx, r_idx = r_idx, r_idx + batch_size
    yield X[l_idx:], y[l_idx:]

# Neural Network

In [None]:
import numpy as np
import math


class Activation:
    """
    The class implements different types of activation functions for
    your neural network layers.

    Example (for sigmoid):
        >>> sigmoid_layer = Activation("sigmoid")
        >>> z = sigmoid_layer(a)
        >>> gradient = sigmoid_layer.backward(delta=1.0)
    """

    def __init__(self, activation_type="sigmoid"):
        """
        Initialize activation type and placeholders here.
        """
        if activation_type not in ["sigmoid", "tanh", "ReLU"]:
            raise NotImplementedError("%s is not implemented." % (activation_type))

        # Type of non-linear activation.
        self.activation_type = activation_type
        # Placeholder for input. This will be used for computing gradients.
        self.x = None

    def __call__(self, a):
        """
        This method allows your instances to be callable.
        """
        return self.forward(a)

    def forward(self, a):
        """
        Compute the forward pass.
        """
        self.x = a
        if self.activation_type == "sigmoid":
            return self.sigmoid(a)

        elif self.activation_type == "tanh":
            return self.tanh(a)

        elif self.activation_type == "ReLU":
            return self.ReLU(a)

    def backward(self, delta):
        """
        Compute the backward pass.
        """
        if self.activation_type == "sigmoid":
            grad = self.grad_sigmoid()

        elif self.activation_type == "tanh":
            grad = self.grad_tanh()

        elif self.activation_type == "ReLU":
            grad = self.grad_ReLU()

        return grad * delta

    def sigmoid(self, x):
        """
        Implement the sigmoid activation here.
        """
        return 1 / (1 + np.exp(-1 * x))

    def tanh(self, x):
        """
        Implement tanh here.
        """
        return np.tanh(x)

    def ReLU(self, x):
        """
        Implement ReLU here.
        """
        return np.maximum(0, x)

    def grad_sigmoid(self):
        """
        Compute the gradient for sigmoid here.
        """
        return self.sigmoid(self.x) * (1 - self.sigmoid(self.x))

    def grad_tanh(self):
        """
        Compute the gradient for tanh here.
        """
        return 1 - (self.tanh(self.x)) ** 2

    def grad_ReLU(self):
        """
        Compute the gradient for ReLU here.
        """
        return (self.x > 0) * 1


class Layer:
    """
    This class implements Fully Connected layers for your neural network.

    Example:
        >>> fully_connected_layer = Layer(1024, 100)
        >>> output = fully_connected_layer(input)
        >>> gradient = fully_connected_layer.backward(delta)
    """

    def __init__(self, in_units, out_units, config):
        """
        Define the architecture and create placeholder.
        """
        np.random.seed(42)
        self.config = config
        self.w = math.sqrt(2 / in_units) * np.random.randn(in_units,
                                                           out_units)  # You can experiment with initialization.
        self.b = np.zeros((1, out_units))  # Create a placeholder for Bias
        self.x = None  # Save the input to forward in this
        self.a = None  # Save the output of forward pass in this (without activation)

        self.d_x = None  # Save the gradient w.r.t x in this
        self.d_w = None  # Save the gradient w.r.t w in this
        self.d_b = None  # Save the gradient w.r.t b in this
        self.m_w = 0
        self.m_b = 0

    def __call__(self, x):
        """
        Make layer callable.
        """
        return self.forward(x)

    def forward(self, x):
        """
        Compute the forward pass through the layer here.
        Do not apply activation here.
        Return self.a
        """
        self.x = x
        self.a = self.x @ self.w + self.b
        return self.a

    def backward(self, delta, gamma):
        """
        Write the code for backward pass. This takes in gradient from its next layer as input,
        computes gradient for its weights and the delta to pass to its previous layers.
        Return self.dx
        """
        prev_delta = delta @ self.w.T
        self.d_x = delta @ self.w.T
        self.d_w = -(self.x.T @ delta)
        self.d_b = -np.sum(delta, axis = 0)
        self.m_w = gamma * self.m_w + (1 - gamma) * self.d_w
        self.m_b = gamma * self.m_b + (1 - gamma) * self.d_b
        if (self.config['momentum']):                   
            self.w = self.w - self.config['learning_rate'] * self.m_w / self.config['batch_size']
            self.b = self.b - self.config['learning_rate'] * self.m_b / self.config['batch_size']
        else:
            self.w = self.w - self.config['learning_rate'] * self.d_w / self.config['batch_size']
            self.b = self.b - self.config['learning_rate'] * self.d_b / self.config['batch_size'] 
        return prev_delta


class NeuralNetwork:
    """
    Create a Neural Network specified by the input configuration.

    Example:
        >>> net = NeuralNetwork(config)
        >>> output = net(input)
        >>> net.backward()
    """

    def __init__(self, config):
        """
        Create the Neural Network using config.
        """
        self.layers = []  # Store all layers in this list.
        self.x = None  # Save the input to forward in this
        self.y = None  # Save the output vector of model in this
        self.targets = None  # Save the targets in forward in this variable
        self.deltas = []
        self.config = config

        # Add layers specified by layer_specs.
        for i in range(len(self.config['layer_specs']) - 1):
            self.layers.append(Layer(self.config['layer_specs'][i], self.config['layer_specs'][i + 1], self.config))
            if i < len(self.config['layer_specs']) - 2:
                self.layers.append(Activation(self.config['activation']))

    def __call__(self, x, targets=None):
        """
        Make NeuralNetwork callable.
        """
        return self.forward(x, targets)

    def forward(self, x, targets=None):
        """
        Compute forward pass through all the layers in the network and return it.
        If targets are provided, return loss as well.
        """
        self.x = x
        self.targets = targets
        for layer in self.layers:
            self.x = layer(self.x)
        self.y = self.softmax(self.x)
        if targets is None:
            return self.y, None
        return self.y, self.loss(self.y, self.targets)

    def backward(self):
        """
        Implement backpropagation here.
        Call backward methods of individual layer's.
        """
        delta = self.targets - self.y
        self.deltas = [delta]
        for i in range(len(self.layers) - 1, -1, -1):
            if isinstance(self.layers[i], Layer):
                delta = self.layers[i].backward(delta, self.config['momentum_gamma'])
            else:
                delta = self.layers[i].backward(delta)
            self.deltas.append(delta)
        return delta

    def softmax(self, x):
        """
        Implement the softmax function here.
        Remember to take care of the overflow condition.
        """
        return np.exp(x - np.max(x, axis = 1, keepdims = True)) \
               / np.sum(np.exp(x - np.max(x, axis = 1, keepdims = True)), axis = 1).reshape(-1,1) 

    def loss(self, logits, targets):
        """
        compute the categorical cross-entropy loss and return it.
        """
        return -np.sum(targets * np.log(logits + 1e-20)) / len(targets)


# Train

In [None]:
from data import write_to_file
#from neuralnet import *

def train(x_train, y_train, x_val, y_val, config):
    """
    Train your model here using batch stochastic gradient descent and early stopping. Use config to set parameters
    for training like learning rate, momentum, etc.

    Args:
        x_train: The train patterns
        y_train: The train labels
        x_val: The validation set patterns
        y_val: The validation set labels
        config: The configs as specified in config.yaml
        experiment: An optional dict parameter for you to specify which experiment you want to run in train.

    Returns:
        5 things:
            training and validation loss and accuracies - 1D arrays of loss and accuracy values per epoch.
            best model - an instance of class NeuralNetwork. You can use copy.deepcopy(model) to save the best model.
    """
    train_acc = []
    val_acc = []
    train_loss = []
    val_loss = []
    best_model = None
    prev_loss = float("inf")
    best_model_loss = float("inf")
    
    model = NeuralNetwork(config=config)
    
    patience = 0
    
    count_epoch = 0
    
    for epoch in tqdm(range(config['epochs'])):
        
        count_epoch += 1
        
        x_train, y_train = shuffle((x_train, y_train))
        
        x_val, y_val = shuffle((x_val, y_val))
        
        for batch in generate_minibatches((x_train, y_train), config['batch_size']):
            
            pred, loss = model.forward(batch[0], batch[1])
            
            model.backward()                       
        
        train_pred, train_losses = model.forward(x_train, y_train)
        
        train_accuracy = accuracy(train_pred, y_train)
        
        train_loss.append(train_losses)
        
        train_acc.append(train_accuracy)
        
        val_pred, val_losses = model.forward(x_val, y_val)
        
        val_accuracy = accuracy(val_pred, y_val)

        val_loss.append(val_losses)
        
        val_acc.append(val_accuracy)
        
        if val_losses < best_model_loss:
            
            best_model_loss = val_losses
            
            best_model = copy.deepcopy(model)
            
        if config['early_stop']:
        
            if val_losses > prev_loss:
            
                patience += 1
            
                if patience == 5:
                
                    break
            
            else:
                
                patience = 0
        
        prev_loss = val_losses
        
    return train_acc, val_acc, train_loss, val_loss, best_model                 
    


def test(model, x_test, y_test):
    """
    Does a forward pass on the model and returns loss and accuracy on the test set.

    Args:
        model: The trained model to run a forward pass on.
        x_test: The test patterns.
        y_test: The test labels.

    Returns:
        Loss, Test accuracy
    """
    # return loss, accuracy
    test_pred, test_loss = model.forward(x_test, y_test)
    
    acc = np.mean(np.argmax(test_pred, axis = 1) == onehot_decode(y_test))
    
    return test_loss, acc

def accuracy(pred, target):
    return np.mean(np.argmax(pred, axis = 1) == onehot_decode(target))


def train_mlp(x_train, y_train, x_val, y_val, x_test, y_test, config):
    """
    This function trains a single multi-layer perceptron and plots its performances.

    NOTE: For this function and any of the experiments, feel free to come up with your own ways of saving data
            (i.e. plots, performances, etc.). A recommendation is to save this function's data and each experiment's
            data into separate folders, but this part is up to you.
    """
    # train the model
    train_acc, valid_acc, train_loss, valid_loss, best_model = \
        train(x_train, y_train, x_val, y_val, config)

    test_loss, test_acc = test(best_model, x_test, y_test)

    print("Config: %r" % config)
    print("Train Loss: ", train_loss[-1])
    print("Validation Loss: ", valid_loss[-1])
    print("Validation Accuracy: ", valid_acc[-1])
    print("Test Loss: ", test_loss)
    print("Test Accuracy: ", test_acc)

    # DO NOT modify the code below.
    data = {'train_loss': train_loss, 'val_loss': valid_loss, 'train_acc': train_acc, 'val_acc': valid_acc,
            'best_model': best_model, 'test_loss': test_loss, 'test_acc': test_acc}

    #write_to_file('./results.pkl', data)


def activation_experiment(x_train, y_train, x_val, y_val, x_test, y_test, config):
    """
    This function tests all the different activation functions available and then plots their performances.
    """
    activations = ['sigmoid', 'tanh', 'ReLU']
    config_copy = config
    for activation in activations:
        config_copy['activation'] = activation
        train_mlp(x_train, y_train, x_val, y_val, x_test, y_test, config_copy)

def topology_experiment(x_train, y_train, x_val, y_val, x_test, y_test, config):
    """
    This function tests performance of various network topologies, i.e. making
    the graph narrower and wider by halving and doubling the number of hidden units.

    Then, we change number of hidden layers to 2 of equal size instead of 1, and keep
    number of parameters roughly equal to the number of parameters of the best performing
    model previously.
    """
    units = [64, 128, 256]
    config_copy = config
    for unit in units:
        config_copy['layer_specs'][1] = unit
        train_mlp(x_train, y_train, x_val, y_val, x_test, y_test, config_copy)
    double_hidden = [784, 112, 112, 10]
    config_copy['layer_specs'] = double_hidden
    train_mlp(x_train, y_train, x_val, y_val, x_test, y_test, config_copy)


def regularization_experiment(x_train, y_train, x_val, y_val, x_test, y_test, config):
    """
    This function tests the neural network with regularization.
    """
    raise NotImplementedError('Regularization Experiment not implemented')


def check_gradients(x_train, y_train, adjust, config):
    """
    Check the network gradients computed by back propagation by comparing with the gradients computed using numerical
    approximation.
    """
    model = NeuralNetwork(load_config('config.yaml'))
    layer = model.layers[0]
    save = copy.deepcopy(layer.w[0][0])
    
    layer.w[0][0] += adjust
    loss_one = model(x_train, y_train)[1]
    
    layer.w[0][0] = save
    layer.w[0][0] -= adjust
    loss_two = model(x_train, y_train)[1]
    
    numeric = (loss_one - loss_two) / (2 * adjust)
    
    layer.w[0][0] = save
    model(x_train, y_train)
    model.backward()
    backward_result = layer.d_w[0][0]
    
    diff = abs(backward_result - numeric)
    return diff


# Training and Testing datasets

In [None]:
X_train = z_score_normalize(load_data(True)[0])[0]
y_train = one_hot_encoding(load_data(True)[1], num_classes=10)
X_test = z_score_normalize(load_data(False)[0])[0]
y_test = one_hot_encoding(load_data(False)[1], num_classes=10)
X_train, y_train = shuffle((X_train, y_train))
X_test, y_test = shuffle((X_test, y_test))
ind = int(0.8 * len(X_train))

# Activation_experiment

In [None]:
activation_experiment(X_train[0:ind], y_train[0:ind], X_train[ind:], 
                      y_train[ind:], X_test, y_test, load_config('config.yaml'))

# Topology_experiment

In [None]:
topology_experiment(X_train[0:ind], y_train[0:ind], X_train[ind:], 
                      y_train[ind:], X_test, y_test, load_config('config.yaml'))

# Check_gradients

In [None]:
count = 0
# 700 和 800 可换成任意相距100的两个数字
min_lst = []
for i in tqdm(range(0, 10001, 1)):
    if check_gradients(np.array([X_train[i]]), np.array([y_train[i]]), 1e-2, load_config('config.yaml')) < 1e-3:
        min_lst.append(check_gradients(np.array([X_train[i]]), np.array([y_train[i]]), 1e-2, load_config('config.yaml')))
        count += 1
print(str(count) + " out of 10000 satisfied the boundry!")
