In [19]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import seaborn as sns

In [20]:
class NeuralNetwork:
    def __init__(self, input_layer, hidden_layer, output_layer, n_classes, lr=0.005, alpha=0.7):
        self.w1 = np.random.normal(0, 1, size=(input_layer, hidden_layer))
        self.w2 = np.random.normal(0, 1, size=(hidden_layer, output_layer))
        self.w3 = np.random.normal(0, 1, size=(output_layer, n_classes))
        self.b1 = np.random.normal(0, 1, size=(1, hidden_layer))
        self.b2 = np.random.normal(0, 1, size=(1, output_layer))
        self.b3 = np.random.normal(0, 1, size=(1, n_classes))
        self.lr = lr
        self.alpha = alpha
    
    def activation_function(self, vec):
        relu = np.vectorize(lambda x: max(0, x))
        return relu(vec)
    
    def derivative_function(self, vec):
        relu_df = np.vectorize(lambda x: 1 if x >= 0 else 0)
        return relu_df(vec)
    
    def softmax(self, vec):
        vec = np.exp(vec)
        sum_exp = np.sum(vec)
        return vec / sum_exp
        
    def forward(self, x):
        weights = [self.w1, self.w2]
        bias = [self.b1, self.b2]
        inputs, outputs = [], [x.reshape((1, -1))] 
        for w, b in zip(weights, bias):
            x = x @ w + b
            inputs.append(x)
            x = self.activation_function(x)
            outputs.append(x)
        x = x @ self.w3 + self.b3
        proba = self.softmax(x)
        return inputs, outputs, proba
    
    def train(self, X, y, epochs=20, batch=64):
        momentum_w = [np.zeros(w.shape) for w in (self.w1, self.w2, self.w3)]
        momentum_b = [np.zeros(b.shape) for b in (self.b1, self.b2, self.b3)]
        for k in tqdm(range(epochs)):
            indx = np.random.permutation(len(y))
            X, y = X[indx], y[indx]
            for i in range(len(y) // batch + bool(len(y) % batch)):
                X_batch, y_batch = X[i*batch:(i+1)*batch], y[i*batch:(i+1)*batch]
                dw = [np.zeros(w.shape) for w in (self.w1, self.w2, self.w3)]
                db = [np.zeros(b.shape) for b in (self.b1, self.b2, self.b3)]
                for x_i, y_i in zip(X_batch, y_batch):
                    inputs, outputs, df = self.forward(x_i)

                    df[0, y_i] -= 1
                    dw[-1] += outputs[-1].T @ df
                    db[-1] += df

                    df = (df @ self.w3.T) * self.derivative_function(inputs[-1])
                    dw[-2] += outputs[-2].T @ df
                    db[-2] += df
                    
                    df = (df @ self.w2.T) * self.derivative_function(inputs[-2])
                    dw[-3] += outputs[-3].T @ df
                    db[-3] += df
                
                for j in range(len(momentum_w)):
                    momentum_w[j] = self.alpha * momentum_w[j] + (1 - self.alpha) * dw[j]
                    momentum_b[j] = self.alpha * momentum_b[j] + (1 - self.alpha) * db[j]
                
                self.w1 -= self.lr * momentum_w[0]
                self.w2 -= self.lr * momentum_w[1]
                self.w3 -= self.lr * momentum_w[2] 
                self.b1 -= self.lr * momentum_b[0]
                self.b2 -= self.lr * momentum_b[1]
                self.b3 -= self.lr * momentum_b[2]
    
    def score(self, X, y):
        pred = np.array([np.argmax(self.forward(x)[2]) for x in X])
        return np.mean(pred == y)

In [21]:
class ReLU:
    _type = "activation_function"
    def activation(self, vec):
        relu = np.vectorize(lambda x: max(0, x))
        return relu(vec)
    
    def derivative(self, vec):
        relu_df = np.vectorize(lambda x: 1 if x >= 0 else 0)
        return relu_df(vec) 

class CrossEntropyLoss:
    _type = "loss"
    def loss(self, vec):
        vec = np.exp(vec)
        sum_exp = np.sum(vec)
        return vec / sum_exp
    
    def derivative(self, p, y):
        p[0, y] -= 1
        return p

class Linear:
    _type = "linear"
    def __init__(self, inp, outp):
        self.weight = np.random.normal(0, 1, size=(inp, outp))
        self.bias = np.random.normal(0, 1, size=(1, outp))

class Sequential:
    def __init__(self, *args):
        self.layers = args
        
    def forward(self, x):
        x = x.reshape((1, -1))
        inputs, outputs = [], [x]
        for layer in self.layers:
            if layer._type == "loss":
                proba = layer.loss(x)
            elif layer._type == "linear":
                x = x @ layer.weight + layer.bias
                inputs.append(x)
            elif layer._type == "activation_function":
                x = layer.activation(x)
                outputs.append(x)
        return inputs, outputs, proba
    
    def train(self, X, y, lr=0.003, epochs=40, batch=64):
        linear = [l for l in self.layers if l._type == "linear"]
        for _ in tqdm(range(epochs)):
            indx = np.random.permutation(len(y))
            X, y = X[indx], y[indx]
            for i in range(len(y) // batch + bool(len(y) % batch)):
                X_batch, y_batch = X[i*batch:(i+1)*batch], y[i*batch:(i+1)*batch]
                dw = [np.zeros(l.weight.shape) for l in linear]
                db = [np.zeros(l.bias.shape) for l in linear]
                for x_i, y_i in zip(X_batch, y_batch):
                    inputs, outputs, df = self.forward(x_i)
                    output_ind, input_ind = -2, -2
                    for layer in self.layers[::-1]:
                        if layer._type == "loss":
                            df = layer.derivative(df, y_i)
                            dw[-1] += outputs[-1].T @ df
                            db[-1] += df
                        elif layer._type == "linear":
                            df = (df @ layer.weight.T) 
                        elif layer._type == "activation_function":
                            df = df * layer.derivative(inputs[input_ind])
                            dw[output_ind] += outputs[output_ind].T @ df
                            db[output_ind] += df
                            output_ind -= 1
                            input_ind -= 1
                
                for i in range(len(linear)):
                    linear[i].weight -= lr * dw[i]
                    linear[i].bias -= lr * db[i]
    
    def score(self, X, y):
        pred = np.array([np.argmax(self.forward(x)[2]) for x in X])
        return np.mean(pred == y)

In [22]:
X, y = make_classification(10000, 5, n_classes=3, n_informative=5, n_redundant=0, random_state=17)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=17, stratify=y)

In [23]:
nn = NeuralNetwork(5, 6, 8, 10)
nn.train(X_train, y_train)

100%|██████████████████████████████████████████████████████████████████████████████████| 20/20 [00:21<00:00,  1.08s/it]


In [24]:
nn.score(X_test, y_test)

0.8546666666666667

In [25]:
layers = [Linear(5, 20),
          ReLU(),
          Linear(20, 20),
          ReLU(),
          Linear(20, 10),
          CrossEntropyLoss()]

model = Sequential(*layers)

In [26]:
model.train(X_train, y_train)

100%|██████████████████████████████████████████████████████████████████████████████████| 40/40 [00:48<00:00,  1.21s/it]


In [27]:
model.score(X_test, y_test)

0.922