# 기계학습 - 2022년 2학기
### 과제2. 다중계층 신경망을 이용한 얼굴 표정 분류기 작성

과제 문의: 전북대학교 컴퓨터공학부 시각 및 학습 연구실 (공과대학 7호관 7619)

Requirements
- Python >= 3.6
- numpy
- matplotlib
- jupyterplot

이번 과제에서는 사람 얼굴의 표정 데이터셋(Toronto Faces Dataset, [TFD](http://aclab.ca/users/josh/TFD.html))을 분류하는 다중계층 신경망(Multi-Layer Neural Net)을 구현하고 테스트 합니다.

In [None]:
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt

import imp
try:
    imp.find_module('jupyterplot')
except ImportError:
    %pip install jupyterplot
    pass

from jupyterplot import ProgressPlot

### Toronto Faces Dataset


TFD는 1-Anger, 2-Disgust, 3-Fear, 4-Happy, 5-Sad, 6-Suprise, 7-Neutral의 총 7개의 클래스를 가진 데이터셋입니다.

데이터셋은 학습, 검증, 테스트(training, validation, test)를 위해서 각각 3374, 419, 385장의 48 $\times$ 48 크기 grayscale 이미지를 제공합니다.

데이터셋의 예시를 확인하기 위해 아래 셀들을 실행해보시기 바랍니다.

In [None]:
#### Please DO NOT DELETE this cell. ###

def LoadData(fname):
    """Loads data from an NPZ file.

    Args:
        fname: NPZ filename.

    Returns:
        data: Tuple {inputs, target}_{train, valid, test}.
              Row-major, outer axis to be the number of observations.
    """
    npzfile = np.load(fname)

    inputs_train = npzfile['inputs_train'].T / 255.0
    inputs_valid = npzfile['inputs_valid'].T / 255.0
    inputs_test = npzfile['inputs_test'].T / 255.0
    target_train = npzfile['target_train'].tolist()
    target_valid = npzfile['target_valid'].tolist()
    target_test = npzfile['target_test'].tolist()

    num_class = max(target_train + target_valid + target_test) + 1
    target_train_1hot = np.zeros([num_class, len(target_train)])
    target_valid_1hot = np.zeros([num_class, len(target_valid)])
    target_test_1hot = np.zeros([num_class, len(target_test)])

    for ii, xx in enumerate(target_train):
        target_train_1hot[xx, ii] = 1.0

    for ii, xx in enumerate(target_valid):
        target_valid_1hot[xx, ii] = 1.0

    for ii, xx in enumerate(target_test):
        target_test_1hot[xx, ii] = 1.0

    inputs_train = inputs_train.T
    inputs_valid = inputs_valid.T
    inputs_test = inputs_test.T
    target_train_1hot = target_train_1hot.T
    target_valid_1hot = target_valid_1hot.T
    target_test_1hot = target_test_1hot.T
    return inputs_train, inputs_valid, inputs_test, target_train_1hot, target_valid_1hot, target_test_1hot


In [None]:
data = LoadData('./toronto_face.npz')
inputs = data[:3]
targets = data[3:]
inputs = {k:v for k, v in zip(['train', 'valid', 'test'], inputs)}
targets = {k:v for k, v in zip(['train', 'valid', 'test'], targets)}

print('training dataset')
print('inputs:', inputs['train'].shape, 'targets:', targets['train'].shape)
print('validation dataset')
print('inputs:', inputs['valid'].shape, 'targets:', targets['valid'].shape)
print('test dataset')
print('inputs:', inputs['test'].shape, 'targets:', targets['test'].shape)

classes = ['anger', 'disgust', 'fear', 'happy', 'sad', 'suprise', 'neutral']
_, labels = np.nonzero(targets['train'])

figs, axes = plt.subplots(nrows=1, ncols=7, figsize=(14,7))
for idx in range(7):
    axis = axes[idx]
    rnd_idx = np.random.choice(np.nonzero(labels == idx)[0])
    axis.axis('off')
    axis.imshow(inputs['train'][rnd_idx].reshape(48, 48), cmap='gray')
    axis.set_title('{}: {}'.format(rnd_idx, classes[idx]))
plt.show()

### Training Multi-layer Neural Networks


1. 기본적인 일반화 (basic generalization): 코드에 주어진 hyperparameter 들을 이용하여 신경망을 학습시킨다. 학습 오차(training error)와 일반화를 위한 검증 오차(validation error) 결과가 어떻게 다른지 설명한다. 두 가지 경우(학습과 일반화 검증)에 대해 오차 커브(error curve)를 그래프로 제시하시오.

2. 최적화 (optimization):  Learning rate, momentum, mini-batch size 세 가지 종류의 parameter 들을 아래와 같이 변화시키면서 다양한 조합들에 대해 신경망이 cross-entropy 관점에서 어떻게 수렴하는지 살펴본다. 가장 우수한 성능을 나타내는 hyperparameter 들의 조합이 어떤 것인지 제시하시오. (모든 경우의 수를 다 따지면 75 가지 신경망 모델을 테스트해야 하나 시간이 너무 많이 결릴 수 있으므로 이 중에서 일부분의 경우들만 테스트해도 된다. 그러나 어떤 근거로 해당 조합들만 테스트했는지 적당한 설명이 있어야 함.)
    - Learning rate ( $\epsilon$ ): 0.001 에서 1.0 사이의 5 가지 경우
    - Momentum: 0.0 에서 0.9 사이의 3 가지 경우
    - Mini-batch size: 1 에서 1000 까지의 5 가지 경우

3. 신경망 모델 구조 변경: Momentum 을 0.9로 고정시킨 상태에서 신경망의 hidden unit 들의 갯수를 2 에서 100 사이의 3 가지 다른 경우에 대해 성능을 비교한다. 필요한 경우 learning rate 와 학습 기간(epochs)은 신경망 구조에 따라 적당하게 변경할 수 있다. Hidden unit 의 갯수들이 학습에서의 수렴과 신경망의 일반화 성는에 미치는 영향에 대한 데이터(표나 그래프)를 제시하고 경향을 분석하시오.

### Utility methods

In [None]:
def Save(fname, data):
    """Saves the model to a numpy file."""
    print('Writing to ' + fname)
    np.savez_compressed(fname, **data)


def Load(fname):
    """Loads model from numpy file."""
    print('Loading from ' + fname)
    return dict(np.load(fname))

### Neural Networks
아래는 neural networks 의 초기화 및 forward pass를 구현한 코드 입니다.

In [None]:
def Affine(x, w, b):
    """Computes the affine transformation.

    Args:
        x: Inputs
        w: Weights
        b: Bias

    Returns:
        y: Outputs
    """
    # y = np.dot(w.T, x) + b
    y = x.dot(w) + b
    return y

def ReLU(x):
    """Computes the ReLU activation function.

    Args:
        x: Inputs

    Returns:
        y: Activation
    """
    return np.maximum(x, 0.0)

def Softmax(x):
    """Computes the softmax activation function.

    Args:
        x: Inputs

    Returns:
        y: Activation
    """
    return np.exp(x) / np.exp(x).sum(axis=1, keepdims=True)

def InitNN(num_inputs, num_hiddens, num_outputs):
    """Initializes NN parameters.

    Args:
        num_inputs:    Number of input units.
        num_hiddens:   List of two elements, hidden size for each layer.
        num_outputs:   Number of output units.

    Returns:
        model:         Randomly initialized network weights.
    """
    W1 = 0.1 * np.random.randn(num_inputs, num_hiddens[0])
    W2 = 0.1 * np.random.randn(num_hiddens[0], num_hiddens[1])
    W3 = 0.01 * np.random.randn(num_hiddens[1], num_outputs)
    b1 = np.zeros((num_hiddens[0]))
    b2 = np.zeros((num_hiddens[1]))
    b3 = np.zeros((num_outputs))
    model = {
        'W1': W1,
        'W2': W2,
        'W3': W3,
        'b1': b1,
        'b2': b2,
        'b3': b3
    }
    return model

def NNForward(model, x):
    """Runs the forward pass.

    Args:
        model: Dictionary of all the weights.
        x:     Input to the network.

    Returns:
        var:   Dictionary of all intermediate variables.
    """
    h1 = Affine(x, model['W1'], model['b1'])
    h1r = ReLU(h1)
    h2 = Affine(h1r, model['W2'], model['b2'])
    h2r = ReLU(h2)
    y = Affine(h2r, model['W3'], model['b3'])
    var = {
        'x': x,
        'h1': h1,
        'h1r': h1r,
        'h2': h2,
        'h2r': h2r,
        'y': y
    }
    return var

아래는 neural networks 의 backward 구현하기 위한 코드들입니다.
아래 세 부분을 채워 코드를 완성시키기 바랍니다.

1. Affine layer 의 backward pass equations (linear trainsformation + bias).
2. RELU activation function 의 backward pass equations.
3. Momentum 이 포함된 weight update equations.

In [None]:
def AffineBackward(grad_y, x, w):
    """Computes gradients of affine transformation.

    Args:
        grad_y: gradient from last layer
        x: inputs
        w: weights

    Returns:
        grad_x: Gradients wrt. the inputs.
        grad_w: Gradients wrt. the weights.
        grad_b: Gradients wrt. the biases.
    """
    ###########################
    # Insert your code here.
    grad_x = grad_y @ w.T
    grad_w = x.T @ grad_y
    grad_b = np.ones(len(grad_y)).T @ grad_y
    return grad_x, grad_w, grad_b
    ###########################
    #raise Exception('Not implemented')

In [None]:
def ReLUBackward(grad_y, x, y):
    """Computes gradients of the ReLU activation function.

    Returns:
        grad_x: Gradients wrt. the inputs.
    """
    ###########################
    # Insert your code here.
    y = (y>0).astype(int)
    grad_x = y * grad_y
    return grad_x
    ###########################
    #raise Exception('Not implemented')

In [None]:
def NNBackward(model, err, var):
    """Runs the backward pass.

    Args:
        model:    Dictionary of all the weights.
        err:      Gradients to the output of the network.
        var:      Intermediate variables from the forward pass.
    """
    dE_dh2r, dE_dW3, dE_db3 = AffineBackward(err, var['h2r'], model['W3'])
    dE_dh2 = ReLUBackward(dE_dh2r, var['h2'], var['h2r'])
    dE_dh1r, dE_dW2, dE_db2 = AffineBackward(dE_dh2, var['h1r'], model['W2'])
    dE_dh1 = ReLUBackward(dE_dh1r, var['h1'], var['h1r'])
    _, dE_dW1, dE_db1 = AffineBackward(dE_dh1, var['x'], model['W1'])
    model['dE_dW1'] = dE_dW1
    model['dE_dW2'] = dE_dW2
    model['dE_dW3'] = dE_dW3
    model['dE_db1'] = dE_db1
    model['dE_db2'] = dE_db2
    model['dE_db3'] = dE_db3
    pass

In [None]:
def NNUpdate(model, eps, momentum):
    """Update NN weights.

    Args:
        model:    Dictionary of all the weights.
        eps:      Learning rate.
        momentum: Momentum.
    """
    ###########################
    # Insert your code here.
    # Update the weights.
    if len(model) == 12:
      model['dE_dW1_v'] = 0.
      model['dE_dW2_v'] = 0.
      model['dE_dW3_v'] = 0.
      model['dE_db1_v'] = 0.
      model['dE_db2_v'] = 0.
      model['dE_db3_v'] = 0.
        
    model['dE_dW1_v'] = momentum * model['dE_dW1_v'] - eps * model['dE_dW1']
    model['dE_dW2_v'] = momentum * model['dE_dW2_v'] - eps * model['dE_dW2']
    model['dE_dW3_v'] = momentum * model['dE_dW3_v'] - eps * model['dE_dW3']
    model['dE_db1_v'] = momentum * model['dE_db1_v'] - eps * model['dE_db1']
    model['dE_db2_v'] = momentum * model['dE_db2_v'] - eps * model['dE_db2']
    model['dE_db3_v'] = momentum * model['dE_db3_v'] - eps * model['dE_db3']

    model['W1'] = model['W1'] + model['dE_dW1_v']
    model['W2'] = model['W2'] + model['dE_dW2_v']
    model['W3'] = model['W3'] + model['dE_dW3_v']
    model['b1'] = model['b1'] + model['dE_db1_v']
    model['b2'] = model['b2'] + model['dE_db2_v']
    model['b3'] = model['b3'] + model['dE_db3_v']
    ###########################
    #raise Exception('Not implemented')

### 훈련

In [None]:
def Train(model, forward, backward, update, eps, momentum, num_epochs,
          batch_size):
    """Trains a simple MLP.

    Args:
        model:           Dictionary of model weights.
        forward:         Forward prop function.
        backward:        Backward prop function.
        update:          Update weights function.
        eps:             Learning rate.
        momentum:        Momentum.
        num_epochs:      Number of epochs to run training for.
        batch_size:      Mini-batch size, -1 for full batch.

    Returns:
        stats:           Dictionary of training statistics.
            - train_ce:       Training cross entropy.
            - valid_ce:       Validation cross entropy.
            - train_acc:      Training accuracy.
            - valid_acc:      Validation accuracy.
    """
    inputs_train, inputs_valid, inputs_test, target_train, target_valid, \
        target_test = LoadData('./toronto_face.npz')
    rnd_idx = np.arange(inputs_train.shape[0])
    train_ce_list = []
    valid_ce_list = []
    train_acc_list = []
    valid_acc_list = []
    
    num_train_cases = inputs_train.shape[0]
    if batch_size == -1:
        batch_size = num_train_cases
    num_steps = int(np.ceil(num_train_cases / batch_size))

    pp = ProgressPlot(
        plot_names=['Cross entropy', 'Accuracy'],
        line_names=['Train', 'Validation'],
        x_label='Iteration',
        x_lim=[0, num_epochs*num_steps]
    )

    valid_ce = 0
    valid_acc = 0
    for epoch in range(num_epochs):
        np.random.shuffle(rnd_idx)
        inputs_train = inputs_train[rnd_idx]
        target_train = target_train[rnd_idx]
        for step in range(num_steps):
            # Forward prop.
            start = step * batch_size
            end = min(num_train_cases, (step + 1) * batch_size)
            x = inputs_train[start: end]
            t = target_train[start: end]

            var = forward(model, x)
            prediction = Softmax(var['y'])

            train_ce = -np.sum(t * np.log(prediction)) / x.shape[0]
            train_acc = (np.argmax(prediction, axis=1) ==
                         np.argmax(t, axis=1)).astype('float').mean()
            pp.update([[train_ce, valid_ce], [train_acc, valid_acc]])

            # Compute error.
            error = (prediction - t) / x.shape[0]

            # Backward prop.
            backward(model, error, var)

            # Update weights.
            update(model, eps, momentum)

        valid_ce, valid_acc = Evaluate(
            inputs_valid, target_valid, model, forward, batch_size=batch_size)
        
        pp.update([[train_ce, valid_ce], [train_acc, valid_acc]])
        train_ce_list.append((epoch, train_ce))
        train_acc_list.append((epoch, train_acc))
        valid_ce_list.append((epoch, valid_ce))
        valid_acc_list.append((epoch, valid_acc))

    # print()
    train_ce, train_acc = Evaluate(
        inputs_train, target_train, model, forward, batch_size=batch_size)
    valid_ce, valid_acc = Evaluate(
        inputs_valid, target_valid, model, forward, batch_size=batch_size)
    test_ce, test_acc = Evaluate(
        inputs_test, target_test, model, forward, batch_size=batch_size)
    print('CE: Train %.5f Validation %.5f Test %.5f' %
          (train_ce, valid_ce, test_ce))
    print('Acc: Train {:.5f} Validation {:.5f} Test {:.5f}'.format(
        train_acc, valid_acc, test_acc))
    pp.finalize()
    stats = {
        'train_ce': train_ce_list,
        'valid_ce': valid_ce_list,
        'train_acc': train_acc_list,
        'valid_acc': valid_acc_list
    }

    return model, stats

def Evaluate(inputs, target, model, forward, batch_size=-1):
    """Evaluates the model on inputs and target.

    Args:
        inputs: Inputs to the network.
        target: Target of the inputs.
        model:  Dictionary of network weights.
    """
    num_cases = inputs.shape[0]
    if batch_size == -1:
        batch_size = num_cases
    num_steps = int(np.ceil(num_cases / batch_size))
    ce = 0.0
    acc = 0.0
    for step in range(num_steps):
        start = step * batch_size
        end = min(num_cases, (step + 1) * batch_size)
        x = inputs[start: end]
        t = target[start: end]
        prediction = Softmax(forward(model, x)['y'])
        ce += -np.sum(t * np.log(prediction))
        acc += (np.argmax(prediction, axis=1) == np.argmax(
            t, axis=1)).astype('float').sum()
    ce /= num_cases
    acc /= num_cases
    return ce, acc

In [None]:
def CheckGrad(model, forward, backward, name, x):
    """Check the gradients

    Args:
        model: Dictionary of network weights.
        name: Weights name to check.
        x: Fake input.
    """
    np.random.seed(0)
    var = forward(model, x)
    loss = lambda y: 0.5 * (y ** 2).sum()
    grad_y = var['y']
    backward(model, grad_y, var)
    grad_w = model['dE_d' + name].ravel()
    w_ = model[name].ravel()
    eps = 1e-7
    grad_w_2 = np.zeros(w_.shape)
    check_elem = np.arange(w_.size)
    np.random.shuffle(check_elem)
    # Randomly check 20 elements.
    check_elem = check_elem[:20]
    for ii in check_elem:
        w_[ii] += eps
        err_plus = loss(forward(model, x)['y'])
        w_[ii] -= 2 * eps
        err_minus = loss(forward(model, x)['y'])
        w_[ii] += eps
        grad_w_2[ii] = (err_plus - err_minus) / 2 / eps
    np.testing.assert_almost_equal(grad_w[check_elem], grad_w_2[check_elem],
                                   decimal=3)


def main():
    """Trains a NN."""
    model_fname = 'nn_model.npz'
    stats_fname = 'nn_stats.npz'

    # Hyper-parameters. Modify them if needed.
    num_hiddens = [16, 32]
    eps = 0.01
    momentum = 0.0
    num_epochs = 1000
    batch_size = 100

    # Input-output dimensions.
    num_inputs = 2304
    num_outputs = 7

    # Initialize model.
    model = InitNN(num_inputs, num_hiddens, num_outputs)

    # Uncomment to reload trained model here.
    # model = Load(model_fname)

    # Check gradient implementation.
    print('Checking gradients...')
    x = np.random.rand(10, 48 * 48) * 0.1
    CheckGrad(model, NNForward, NNBackward, 'W3', x)
    CheckGrad(model, NNForward, NNBackward, 'b3', x)
    CheckGrad(model, NNForward, NNBackward, 'W2', x)
    CheckGrad(model, NNForward, NNBackward, 'b2', x)
    CheckGrad(model, NNForward, NNBackward, 'W1', x)
    CheckGrad(model, NNForward, NNBackward, 'b1', x)

    # Train model.
    trained_model, stats = Train(model, NNForward, NNBackward, NNUpdate, eps,
                  momentum, num_epochs, batch_size)

    plt.figure(0)
    plt.plot(np.array(stats['train_ce'])[:, 0], np.array(stats['train_ce'])[:, 1], 'b', label='Train')
    plt.plot(np.array(stats['valid_ce'])[:, 0], np.array(stats['valid_ce'])[:, 1], 'orange', label='Validation')
    plt.xlabel('Epoch')
    plt.ylabel('Cross Entropy')
    plt.legend()

    plt.figure(1)
    plt.plot(np.array(stats['train_acc'])[:, 0], np.array(stats['train_acc'])[:, 1], 'b', label='Train')
    plt.plot(np.array(stats['valid_acc'])[:, 0], np.array(stats['valid_acc'])[:, 1], 'orange', label='Validation')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()
    # Uncomment if you wish to save the model.
    # Save(model_fname, model)

    # Uncomment if you wish to save the training statistics.
    # Save(stats_fname, stats)

In [None]:
main()