## Multi Layer Perceptron 

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


np.random.seed(1)
%matplotlib inline

## 1. Define functions

In [2]:
def one_hot_encode(x, n_class):
    """One Hot encoding
    
    parameter
    ---------
    x: array_like [n_sample]
    
    n_class: number of class
    
    return
    ------
    en_1hot: array_like [n_smaple, n_class] one hot encoding matrix
    """
    
    en_1hot = np.zeros([len(x), n_class])
    
    for idx, cat in enumerate(x):
        en_1hot[idx, cat] = 1

    return en_1hot


def sigmoid(x, derivative=False):
    if derivative:
        return x * (1. - x)
    
    return 1. / (1. + np.exp(-x))

def relu(x, derivative=False):
    if derivative:
        return 1.0 * (x > 0)
    #x[x < 0] = 0
    return x * (x > 0)


def softmax(x):
    exp_x = np.exp(x)
    return  exp_x / np.sum(exp_x, axis=-1, keepdims=True)


def foward(X, params, is_training=True):
    """Neuron network forward
    
    parameter
    ----------
    X: array_like [n_sample, n_input]
    params: dictionary of layer weights
    
    return
    ------
    output: final layer output
    caches: dictionary of foward layer output
    """

    caches = {}
    n_layers = len(params)
    layer_fmt = 'layer_{}'
    
    A = X
    for idx in range(n_layers):
        W, b, keep_prob = params[layer_fmt.format(idx + 1)]
        Z = np.matmul(A, W) + b
        
        # last layer
        if idx == n_layers - 1:
            A = sigmoid(Z)
        else:
            A = relu(Z)
            
        if is_training and keep_prob is not None:
            D =  np.random.binomial(1, keep_prob, size=A.shape) / (keep_prob)
            A = A * D
            caches[layer_fmt.format(idx + 1)] = (Z, A, D)
        else:
            caches[layer_fmt.format(idx + 1)] = (Z, A, None)
    
    # padding input layer to cache
    caches[layer_fmt.format(0)] = (None, X, None)

    return A, caches
    

def backward(X, y, params, caches, is_traning=True):
    """Neuron network backward
    
    parameter
    ----------
    X: array_like [n_sample, n_input]
    y: array_like [n_sample, n_class]
    params: dictionary of layer weights
    caches: dictionary of foward layer output
    
    return
    ------
    grads: dictionary of layer weights gradient
    
    """
    
    m = X.shape[0]
    n_layers = len(params)
    layer_fmt = 'layer_{}'
    
    # last layer gradient
    _, A, _ = caches[layer_fmt.format(n_layers)]
    dZ = A - y
    
    grads = {}
    gnorm = 0
    for idx in reversed(xrange(n_layers)):
        W, _, _ = params[layer_fmt.format(idx + 1)]
        _, A, D = caches[layer_fmt.format(idx)]
                
        dW = np.dot(A.T, dZ) / m 
        dW_reg = W / m
        db = np.sum(dZ, axis=0) / m
        
        dZ = np.dot(dZ, W.T) * relu(A, derivative=True) # (A * (1. - A))
     
        if D is not None:
            dZ = dZ * D
    
        gnorm += np.linalg.norm(dW) + np.linalg.norm(dW_reg) + np.linalg.norm(db)
        grads[layer_fmt.format(idx + 1)] = (dW, db, dW_reg)
    
    return grads, gnorm


def update_parameters(params, grads, alpha, learning_rate):
    """Update parameters
    
    parameters
    ----------
    grads: dictionary of layer weights gradient
    params: dictionary of layer weights
    learning_rate: learing rate to update weights
    
    """

    n_layers = len(grads)
    layer_fmt = 'layer_{}'
    
    for idx in xrange(n_layers):
        W, b, _ = params[layer_fmt.format(idx + 1)]
        dW, db, dW_reg = grads[layer_fmt.format(idx + 1)]
        W[:] = W - learning_rate * (dW + alpha * dW_reg)
        b[:] = b - learning_rate * db
    
    return params


def compute_cost(output, y):
    """Compute loss
    
    parameter
    ---------
    output: array_like [n_sample, n_class] nn predict output
    y: array_like [n_sample, n_class]
    """
    loss = y * np.log(output) + (1 - y) * np.log(1 - output)
    loss = -np.mean(np.sum(loss, axis=-1, keepdims=True))
    
    return loss

def comput_reg_cost(params):
    
    reg_cost = 0
    layer_fmt = 'layer_{}'
    for idx in xrange(len(params)):
        W, _, _ = params[layer_fmt.format(idx + 1)]
        reg_cost += np.linalg.norm(W)
    
    return reg_cost


def gradient_descent(X, y, params, alpha=1e-3, learning_rate=1e-2,
                     max_iter=None, gtol=1e-5,
                     show_cost=None):
    """Gradient descent 
    
    parameters
    ----------
    X: array_like [n_samples, n_features]
    y: array_like [n_samples, n_class]
    alpha: l2 regularization
    params: dictionary of layer weights

    return
    ------
    parmas: dictionary of layer weights
    costs: costs of iteratsions
    """
    
    if max_iter is None:
        max_iter = 1000
    
    n_sample = X.shape[0]
    costs = np.zeros(max_iter)
    
    for n_iter in xrange(max_iter):    
        
        output, caches = foward(X, params)
        costs[n_iter] = compute_cost(output, y) + (comput_reg_cost(params) * (alpha / (2. * n_sample)))
        grads, gnorm = backward(X, y, params, caches)
        params = update_parameters(params, grads, alpha, learning_rate)
        
        if show_cost is not None and (n_iter % show_cost) == 0:
            print('costs[%4d]: %.4e' % (n_iter, costs[n_iter]))
            
        if gnorm < gtol:
            costs = costs[:n_iter]
            break
    
    return params, costs

## 2. Split data

In [3]:
def mnist_split_data(csv_file):
    """Split image and label from mnist csv data
    
    return
    ------
    X: array_like [n_sample, n_feature] mnist flat image
    y: array_like [n_sample] mnist image label
    """
    
    df = pd.read_csv(csv_file)
    
    X = df.iloc[:, 1:-1].values
    y = df.iloc[:, -1].values
    return X, y

In [4]:
X_train, y_train = mnist_split_data('exam1_train.csv')
X_test, y_test = mnist_split_data('exam1_test.csv')

n_class = len(np.unique(y_train))
y_train_en = one_hot_encode(y_train, n_class)

## 3. Initialize parameters

In [5]:
def weights_init(layer_dims):
    """weights initialize for layers
    
    parameter
    ---------
    layer_dims: list [n_input, n_output]
    
    return
    ------
    model: dictionary of layer weights
    """
    
    model = {}
    
    for idx, (n_input, n_output, dropout_rate) in enumerate(layer_dims):
        layer_name = 'layer_{}'.format(idx + 1)
        
        # He initializer 
        weight = np.random.randn(n_input, n_output) / np.sqrt(n_input / 2)
        #bias = np.zeros([n_output])
        bias = np.ones([n_output]) * 0.01
    
        model[layer_name] = (weight, bias, dropout_rate)
        
    return model

## 4. Neural Network model with 2 hidden layer

In [6]:
layer_dims = [(400, 256,   1.),      # (n_input, n_output, keep_prob)
              (256,  32,   1.),
              ( 32,  10, None)]

model = weights_init(layer_dims)

n_sample = X_train.shape[0]
epochs = 30
batch_size = 128
n_batch = n_sample // batch_size
n_batch = n_batch + 1 if (n_sample % n_batch) != 0 else n_batch 
idx = np.array(range(n_sample))

for epoch in xrange(epochs):
    #print('epoch:[{:2d}/{:2d}]\r'.format(epoch + 1, epochs))
    np.random.shuffle(idx)
    for b_idx in xrange(n_batch):
        start, end = b_idx * batch_size, (b_idx + 1) * batch_size
        if end >= n_sample:
            sample_idx = idx[start:]
        else:
            sample_idx = idx[start:end]
        
        model, costs = gradient_descent(X_train[sample_idx,:], y_train_en[sample_idx, :], model,
                                        learning_rate=0.1, max_iter=1)

# model, costs = gradient_descent(X_train, y_train_en, model, alpha=1e-2,
#                                 learning_rate=0.1, max_iter=500)

## 5. Predictions

In [7]:
def predict(X, model):
    """Predcit smaples with softmax prob
    
    parameter
    ---------
    X: array_like [n_sample, n_feature]
    model: dictionary of layer weights
    
    return
    ------
    softmax prob label 
    """
    
    output, _ = foward(X, model, is_training=False)
    
    return np.argmax(softmax(output), axis=-1)

def accuracy(y_pred, y_true):
    
    return np.mean(np.equal(y_pred, y_true))
    

In [8]:
y_train_pred = predict(X_train, model)
train_acc = accuracy(y_train_pred, y_train)
print("train acc: {:6.4f}".format(train_acc * 100))

y_test_pred = predict(X_test, model)
test_acc = accuracy(y_test_pred, y_test)
print("test acc: {:6.4f}".format(test_acc * 100))

train acc: 98.0571
test acc: 91.6667


## 6. Optimization

In [9]:
def copy_weights(dst, src):

    layer_fmt = 'layer_{}'
    for idx in xrange(len(src)):
        W_src, b_src, _ = src[layer_fmt.format(idx + 1)]
        W_dst, b_dst, _ = dst[layer_fmt.format(idx + 1)]
        W_dst[:] = W_src
        b_dst[:] = b_src

In [10]:
# layer_dims = [(400, 512, 0.5),      # (n_input, n_output, keep_prob)
#               (512, 256, 0.5),      # lr = 0.3
#               (256,  64, 0.5),      # epoch = 150
#               ( 64,  32, 0.5),      # batch = 256
#               ( 32,  10, None)]

# layer_dims = [(400, 256, 0.8),      # (n_input, n_output, keep_prob)
#               (256, 256, 0.8),      # lr = 0.5
#               (256,  64, 0.8),      # alpha = 1e-3
#               ( 64,  32, 0.8),      # epoch = 150 
#               ( 32,  10, None)]     # batch = 128

layer_dims = [(400, 256, 0.5),
              (256, 256,None),
              (256, 128, 0.5),
              (128,  10,None)]

#n_iter = 1000
learning_rate = 0.25 #0.15
alpha = 1e-2

n_sample = X_train.shape[0]
epochs = 150
batch_size = 128
n_batch = n_sample // batch_size
n_batch = n_batch + 1 if (n_sample % n_batch) != 0 else n_batch 
idx = np.array(range(n_sample))

model = weights_init(layer_dims)
best_model = weights_init(layer_dims)
best_acc = 0.

for epoch in xrange(epochs):
    np.random.shuffle(idx)
    for b_idx in xrange(n_batch):
        start, end = b_idx * batch_size, (b_idx + 1) * batch_size
        if end >= n_sample:
            sample_idx = idx[start:]
        else:
            sample_idx = idx[start:end]
        
        model, costs = gradient_descent(X_train[sample_idx,:], y_train_en[sample_idx, :], model, alpha=alpha,
                                        learning_rate=learning_rate, max_iter=1)
    
    acc = accuracy(predict(X_test, model), y_test)
    #print('epoch:[{:3d}/{:3d}] acc: {:4.2f}'.format(epoch + 1, epochs, acc * 100))
    if acc > 0.95:
        break
        #best_acc = acc
        #copy_weights(best_model, model)b

y_train_pred = predict(X_train, model)
train_acc = accuracy(y_train_pred, y_train)   
    
y_test_pred = predict(X_test, model)
test_acc = accuracy(y_test_pred, y_test)

print('learning rate {:.4f} alpha {:.4f} epochs {:d}'.format(learning_rate, alpha, epoch))
print('Train acc: {:4.2f}'.format(train_acc * 100))
print('Test acc: {:4.2f}'.format(test_acc * 100))



learning rate 0.2500 alpha 0.0100 epochs 104
Train acc: 100.00
Test acc: 95.13
