# Import packages and set options

In [None]:
import numpy as np
#from tqdm import tqdm
import matplotlib.pyplot as plt

%matplotlib inline
plt.style.use('dark_background')
plt.rcParams['figure.figsize'] = (10.0, 8.0)
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

np.set_printoptions(precision=4)

# Activation and loss functions

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_prime(x):
    return x * (1 - x)

def tanh(x):
    return np.tanh(x)

def tanh_prime(x):
    return 1-np.tanh(x)**2

def relu(x):
    return np.maximum(0, x)

def relu_prime(x):
    return (x > 0) * 1

def softmax(x):
    tmp = np.exp(x)
    return tmp / np.sum(tmp, axis=0, keepdims=True)

def softmax_prime(x):                     # X(n,m) : matrix of softmax probabilities, #n classes, #m samples 
    i = np.eye(x.shape[0])                # Identity matrix with n dimensions
    tmp1 = np.einsum('ij,ik->jik', x, i)  # Matrix of diagolized softmax values (per m)
    tmp2 = np.einsum('ij,kj->jik', x, x)  # Matrix of cross product of softmax values (per m)
    return tmp1 - tmp2 

def centropy(x, y):
    eps = 1e-8
    x = x.clip(min=eps, max=None)
    logp = np.where(y==1, -np.log(x), 0)
    return logp.sum(axis=0)

def centropy_prime(x, y):
    # Averaging is made by calling function
    eps = 1e-8
    x = x.clip(min=eps, max=None)
    return np.where(y==1, -1/x, 0)

def mse(y_pred, y_obs):
    return np.power(y_pred - y_obs, 2)

def mse_prime(y_pred, y_obs):
    # Averaging is made by calling function
    return 2*(y_pred - y_obs)

def one_hot(y): 
    return np.eye(len(set(y)))[y]

# Layer(s) class

In [None]:
class Layer:
    def __init__(self) -> None:
        self.params = {}
        self.grads = {}
        self.lr = 0.0
        self.lr_type = None
        self.rg = 0.0
        self.rg_type = None
           
    def forward(self, input_data):
        raise NotImplementedError
    
    def backward(self, grad, alpha):
        raise NotImplementedError
        
class ZLayer(Layer):
    def __init__(self, inSize, outSize, eps_W=1e-2, eps_B=0.0, wInit='Xavier', seed=None, lr=.5, lr_type=None, rg=0.0, rg_type='L2'):
        super().__init__()
        if seed != None:
            np.random.seed(seed)
        if wInit.upper() == 'HE':
            sigma = np.sqrt(2 / inSize)
        else :
            sigma = np.sqrt(2 / (inSize + outSize))
        self.params['W'] = sigma * np.random.randn(outSize, inSize)
        self.params['B'] = np.zeros((outSize, 1))
        #print('W:\n', self.params['W'][0][0:10], self.params['W'].shape)
        self.lr = lr
        self.lr_initial = lr
        self.lr_type = lr_type
        self.rg = rg
        self.rg_type = rg_type
        
    def forward(self, inData):
        self.input = inData
        self.output = np.dot(self.params['W'], self.input) + self.params['B']
        return self.output
        
    def backward(self, outGrad, alpha=1e-0):
        inGrad = np.dot(self.params['W'].T, outGrad)
        self.grads['W'] = np.dot(outGrad, self.input.T) + self.rg / outGrad.shape[1] * self.params['W']
        self.grads['B'] = np.sum(outGrad, axis=1, keepdims=True)
        return inGrad        
    
class ALayer(Layer):
    def __init__(self, g, gPrime):
        super().__init__()
        self.g = g
        self.gPrime = gPrime

    def forward(self, inData):
        self.input = inData
        self.output = self.g(self.input)
        return self.output

    def backward(self, outGrad):
        return self.gPrime(self.input) * outGrad

class SLayer(ALayer):
    def backward(self, outGrad):
        return np.einsum('jik,kj->ij', self.gPrime(self.output), outGrad)
    
class LLayer(Layer):
    def __init__(self, g, gPrime, eps=1e-8):
        self.g = g
        self.gPrime = gPrime
        self.eps = eps

    def forward(self, pred, obs):
        self.input = pred
        self.output = self.g(pred, obs)
        return self.output

    def backward(self, pred, obs):
        return self.gPrime(pred, obs)    


# Optimiser class

In [None]:
class Optimiser():
    def step(self, layers):
        raise NotImplementedError
        
class SGD(Optimiser):
    def lr(self, layer, epoch, epochs):
        if len(layer.params) == 0 or layer.lr_type == None:
            return
        elif layer.lr_type.upper() == 'EXP':
            k = 10 / epochs
            layer.lr = layer.lr_initial * np.exp(-k * epoch)
        elif layer.lr_type.upper() == 'DECAY':
            decay = lr_initial / epochs
            layer.lr = layer.lr_initial * 1 / (1 + decay * epoch)
        elif layer.lr_type.upper() == 'STEP':
            drop_rate = .8
            epochs_drop = .1 * epochs
            layer.lr = layer.lr_initial * np.power(drop_rate, np.floor(epoch / epochs_drop))
            
    def step(self, layer):
        if len(layer.params) > 0:
            for name, param in layer.params.items():
                param -= layer.lr * layer.grads[name]
                
    def rg(self, layer):
        if len(layer.params) == 0 or layer.rg == 0 or layer.rg_type == None:
            return 0
        elif layer.rg_type.upper() == 'L2':
            W = layer.params['W']
            return layer.rg * np.sum(np.square(W))
        return 0

# Batch class

In [None]:
from collections import namedtuple

Batch = namedtuple("Batch", ["x", "y"])

class DataIterator:
    def __call__(self, x, y):
        raise NotImplementedError

class BatchIterator(DataIterator):
    def __init__(self, size=None, shuffle=True):
        self.size = size
        self.shuffle = shuffle
        
    def __call__(self, x, y):
        lenX = x.shape[1]
        if self.size == None or self.size > lenX:
            yield Batch(x, y)
        else :
            if self.shuffle:
                idx = np.random.permutation((lenX))
            for start in range(0, lenX - self.size + 1, self.size):
                if self.shuffle:
                    batch = idx[start:start + self.size]
                else:
                    batch = slice(start, start + self.size)
                yield Batch(x[:, batch], y[:, batch])
            
    def shuffle(self, x, y):
        starts = np.arange(0, x.shape[1], self.size)
        if self.shuffle:
            np.random.shuffle(starts)
        for start in starts:
            end = start + self.size
            yield Batch(x[:, start:end], y[:, start:end])

# Network class

In [None]:
class Network:
    def __init__(self, loss = LLayer(centropy, centropy_prime), optimiser = SGD(), iterator = BatchIterator()):
        self.layers = []
        self.loss = loss
        self.optimiser = optimiser
        self.iterator = iterator
        self.epoch_loss = []
        self.epoch_acc = []
        
    def add(self, layer):
        self.layers.append(layer)

    def forward(self, output):
        for layer in self.layers:
            output = layer.forward(output)
        return output
    
    def backward(self, grad):
        for layer in reversed(self.layers):
            grad = layer.backward(grad)
        return grad
    
    def update_lr(self, epoch, epochs):
        for layer in self.layers:
            self.optimiser.lr(layer, epoch, epochs)
    
    def rg_loss(self):
        rg_loss = 0.0
        for layer in self.layers:
            rg_loss += self.optimiser.rg(layer)
        return rg_loss
    
    def update_weights(self):
        for layer in self.layers:
            self.optimiser.step(layer)
        
    def predict(self, x):
        scores = self.forward(x)
        return np.argmax(scores, axis=0)
        
    def accuracy(self, x, y):
        y_pred = self.predict(x)
        return np.mean(y_pred == y)
    
    def boundary_plot(self, x, y, h=.02):
        x_min, x_max = x[0, :].min() - .1, x[0, :].max() + .1
        y_min, y_max = x[1, :].min() - .1, x[1, :].max() + .1
        xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
        x_grid = np.c_[xx.ravel(), yy.ravel()].T
        Z = self.predict(x_grid)
        Z = Z.reshape(xx.shape)
        fig = plt.figure(figsize=(6.7,5))
        plt.contourf(xx, yy, Z, cmap=plt.cm.Spectral, alpha=0.6)
        plt.scatter(x[0, :], x[1, :], c=y, s=40, cmap=plt.cm.Spectral)
        plt.title('Decision boundary')
        plt.xlim(xx.min(), xx.max())
        plt.ylim(yy.min(), yy.max())
        
    def performance_plot(self):
        fig, axs = plt.subplots(figsize=(12,4), nrows=1, ncols=2, sharex=True, constrained_layout=True)
        ax = axs[0]
        ax.plot(self.epoch_loss)
        ax.set_title('Loss')
        ax = axs[1]
        ax.plot(self.epoch_acc, label='Acc')
        ax.set_title('Accuracy')
        ax.set_ylim([0, 1])
        plt.show()
        
    def fit(self, x_train, y_train, epochs, verbose=1):
        for epoch in range(epochs):
            epoch_loss = 0.0
            epoch_rg_loss = 0.0
            epoch_acc = 0.0
            for k, batch in enumerate(self.iterator(x_train, y_train)):
                y_pred = self.forward(batch.x)
                epoch_loss += np.sum(self.loss.forward(y_pred, batch.y))
                epoch_rg_loss += self.rg_loss()
                epoch_acc = 1/(k+1) * (np.mean(np.argmax(y_pred, axis=0) == np.argmax(batch.y, axis=0)) + epoch_acc * k)
                grad = self.loss.backward(y_pred, batch.y) / y_pred.shape[1]            
                grad = self.backward(grad)
                self.update_weights()
            
            self.update_lr(epoch, epochs)
            
            epoch_loss /= y_train.shape[1]
            epoch_loss += epoch_rg_loss / (2 * y_train.shape[1])
            self.epoch_loss.append(epoch_loss)
            self.epoch_acc.append(epoch_acc)
        
            if verbose > 1 or (verbose == 1 and epoch % (epochs/10) == 0):
                print('epoch %5.d/%d   loss: %.3f, acc: %.3f, lr: %.4f, rg: %.4f' % (epoch, epochs, epoch_loss, epoch_acc, self.layers[0].lr, self.layers[0].rg))

# (1) Simulation 3 classes and 2 dimensions random data with 1 HL (Perceptron) and 2 HL (MLP)

In [None]:
# Generate data samples

np.random.seed(0)
N = 100 # number of points per class
D = 2 # dimensionality
K = 3 # number of classes
X = np.zeros((N*K,D))
y = np.zeros(N*K, dtype='uint8')
for j in range(K):
  ix = range(N*j,N*(j+1))
  r = np.linspace(0.0,1,N) # radius
  t = np.linspace(j*4,(j+1)*4,N) + np.random.randn(N)*0.2 # theta
  X[ix] = np.c_[r*np.sin(t), r*np.cos(t)]
  y[ix] = j

x_train = X.T
y_train = one_hot(y)
y_train = y_train.T

In [None]:
# 1 HL

nn = Network(loss = LLayer(centropy, centropy_prime), optimiser = SGD(), iterator = BatchIterator(size=64))
nn.add(ZLayer(2, 3, wInit='Xavier', lr_type='exp'))
nn.add(SLayer(softmax, softmax_prime))

nn.fit(x_train, y_train, 100, 1)

print('training accuracy: %.2f' % nn.accuracy(x_train, y))
nn.performance_plot()
nn.boundary_plot(x_train, y)

In [None]:
# 2 HL

nn = Network(loss = LLayer(centropy, centropy_prime), optimiser = SGD(), iterator = BatchIterator(size=64))
nn.add(ZLayer(2, 100, wInit='He', lr_type='step', rg=.7, rg_type='L2'))
nn.add(ALayer(relu, relu_prime))
nn.add(ZLayer(100, 3))
nn.add(SLayer(softmax, softmax_prime))

nn.fit(x_train, y_train, 5_000, 1)

print('training accuracy: %.3f' % nn.accuracy(x_train, y))
nn.performance_plot()
nn.boundary_plot(x_train, y)

# (2) Simulation XOR gate with 1 hidden layer

In [None]:
np.set_printoptions(precision=4)

x_train = np.array([ [0, 0], [1, 0], [0, 1], [1, 1] ]).T
y = np.array([0,1,1,0])
y_train = one_hot(y).T

nn = Network(loss = LLayer(mse, mse_prime), optimiser = SGD())
nn.add(ZLayer(2, 2, eps_W=1.0, eps_B=1.0))
nn.add(ALayer(tanh, tanh_prime))
nn.add(ZLayer(2, 2, eps_W=1.0, eps_B=1.0))

nn.fit(x_train, y_train, 5_000, 1)

print('\ntraining accuracy: %.3f' % nn.accuracy(x_train, y))
nn.performance_plot()
nn.boundary_plot(x_train, y)

print('\nXtrain\tYpred\t\t\tYtrain')
predicted = nn.forward(x_train)
predicted_01 = np.argmax(predicted, axis=0)
for i in range(x_train.shape[1]):
    print(x_train[:,i].T, '\t', predicted[:,i].T, predicted_01[i], '\t', y_train[:,i].T, y[i])

# (3) Simulation sensitivy of hidden layer size 

In [None]:
# Generate data samples

import sklearn.datasets
np.random.seed(0)
X, y = sklearn.datasets.make_moons(200, noise=.2)
fig = plt.figure(figsize=(6.7,5))
plt.scatter(X[:,0], X[:, 1], s=40, c=y, cmap=plt.cm.Spectral)

x_train = X.T
y_train = one_hot(y)
y_train = y_train.T

In [None]:
for n in [1, 3, 10, 50]:
    nn = Network(loss = LLayer(centropy, centropy_prime), optimiser = SGD(), iterator = BatchIterator(size=64))
    nn.add(ZLayer(2, n, eps_W=1/np.sqrt(2)))
    nn.add(ALayer(tanh, tanh_prime))
    nn.add(ZLayer(n, 2, eps_W=1/np.sqrt(n)))
    nn.add(SLayer(softmax, softmax_prime))

    nn.fit(x_train, y_train, 10_000, 0)

    print('training accuracy: %.3f' % nn.accuracy(x_train, y))
    nn.performance_plot()
    nn.boundary_plot(x_train, y)
print('done.')

# (4) Neural network data points belonging to three classes
### https://www.cristiandima.com/neural-networks-from-scratch-in-python

In [None]:
np.random.seed(1)

# generate three Gaussian clouds each holding 500 points
X1 = np.random.randn(500, 2) + np.array([0, -2])
X2 = np.random.randn(500, 2) + np.array([2, 2])
X3 = np.random.randn(500, 2) + np.array([-2, 2])

# put them all in a big matrix
X = np.vstack([X1, X2, X3])

# generate the one-hot-encodings
labels = np.array([0]*500 + [1]*500 + [2]*500)
T = np.zeros((1500, 3))
for i in range(1500):
    T[i, labels[i]] = 1

# visualize the data
fig = plt.figure(figsize=(6.7,5))
plt.scatter(X[:,0], X[:,1], c=labels, s=100, cmap=plt.cm.Spectral, alpha=0.5)
plt.show()

x_train = X.T
y_train = T.T

In [None]:
nn = Network(loss = LLayer(centropy, centropy_prime), optimiser = SGD(alpha=1e-2), iterator = BatchIterator(size=None))
nn.add(ZLayer(2, 5))
nn.add(ALayer(tanh, tanh_prime))
nn.add(ZLayer(5, 3))
nn.add(SLayer(softmax, softmax_prime))

nn.fit(x_train, y_train, 5_000, 1)

print('training accuracy: %.3f' % nn.accuracy(x_train, labels))
nn.performance_plot()
nn.boundary_plot(x_train, labels)

# (5) L2 regularization
### https://github.com/Coding-Lane/L2-Regularization

In [None]:
x_train = np.loadtxt('data/train_x.csv', delimiter = ',')
y = np.loadtxt('data/train_y.csv', delimiter = ',').reshape(1, x_train.shape[1])
y = y.reshape(y.shape[1]).astype(int)
y_train = one_hot(y).T
#y_train = y
print("Shape of train_X : ", x_train.shape)
print("Shape of train_Y : ", y_train.shape)
np.set_printoptions(precision=4)

nn = Network(loss = LLayer(centropy, centropy_prime))
nn.add(ZLayer(2, 100, wInit='He', lr=.3, seed=3))
nn.add(ALayer(relu, relu_prime))
nn.add(ZLayer(100, 10, wInit='He', lr=.3, seed=3))
nn.add(ALayer(relu, relu_prime))
nn.add(ZLayer(10, 2, wInit='He', lr=.3, seed=3))
nn.add(SLayer(softmax, softmax_prime))

nn.fit(x_train, y_train, 10_000, 1)

print('training accuracy: %.3f' % nn.accuracy(x_train, y))
nn.performance_plot()
nn.boundary_plot(x_train, y)

In [None]:

nn = Network(loss = LLayer(centropy, centropy_prime))
nn.add(ZLayer(2, 100, wInit='He', lr=.3, seed=3, rg=.7))
nn.add(ALayer(relu, relu_prime))
nn.add(ZLayer(100, 10, wInit='He', lr=.3, seed=3, rg=.7))
nn.add(ALayer(relu, relu_prime))
nn.add(ZLayer(10, 2, wInit='He', lr=.3, seed=3, rg=.7))
nn.add(SLayer(softmax, softmax_prime))

nn.fit(x_train, y_train, 10_000, 1)

print('training accuracy: %.3f' % nn.accuracy(x_train, y))
nn.performance_plot()
nn.boundary_plot(x_train, y)

In [None]:
import import_ipynb