In [1]:
import numpy as np
from sklearn.metrics import *
from einops import rearrange, einsum
from sklearn.datasets import make_regression

M, N = 5000, 100
X, y = make_regression(M, N)
y = ((y-y.mean())/abs(y-y.mean()).max()).reshape(-1, 1)
print(X.shape, y.shape)

(5000, 100) (5000, 1)


In [2]:
def get_activation_fn(activation='relu'):
    if activation == 'relu':
        def relu(x): return 0 + x * (x > 0).astype(int)
        return relu
    
def get_grad_fn(activation='relu'):
    if activation == 'relu':
        def grad_relu(z): return (z >= 0).astype(np.float32)
        return grad_relu

In [3]:
from einops import rearrange, einsum


def xavier_uniform(*shape):
    assert len(shape) >= 2
    in_fan, out_fan = shape[-2], shape[-1]
    ret =  2 * (np.random.rand(*shape) - 0.5)
    ret *= (6 / (in_fan + out_fan)) ** 0.5
    return ret
    

class MLPRegressor():
    def __init__(self, hidden_layer_sizes=(100,), solver='sgd', activation='relu', lr=1e-3, max_iters=200, optim_kwargs={}):
        self.max_iters = max_iters
        self.lr = lr
        assert solver in ['sgd'], "Solver is not supported yet."
        assert activation in ['identity', 'logistic', 'tanh', 'relu'], "Activation is not supported yet."
        self.activation = activation
        self.hidden_layer_sizes = hidden_layer_sizes
        self.optim_kwargs = optim_kwargs
        self.act_fn = get_activation_fn(activation)
        self.grad_fn = get_grad_fn(activation)
        self.eps = 1e-4
        
    def _init_optimizer(self):
        if self.solver == 'sgd':
            self.optimizer = SGD(**self.optim_kwargs)
        if self.solver == 'adam':
            self.optimizer = Adam(**self.optim_kwargs)
        

    def _init_weights(self):
        self.weights = []
        self.biases = []
        prev_size = self.input_size
        for hidden_layer_size in self.hidden_layer_sizes:
            self.weights.append(xavier_uniform(hidden_layer_size, prev_size))
            self.biases.append(np.array(0.0))
            prev_size = hidden_layer_size
        self.weights.append(xavier_uniform(1, prev_size))
        self.biases.append(np.array(0.0))
    
    def fit(self, X, y):
        M, N = X.shape
        
        self.input_size = N
        self._init_weights()
        y = y.reshape(M, 1)
        
        
        prev_loss = float('inf')
        for iteration in range(self.max_iters):
            grad_weights, grad_biases = self.calc_grad(X, y)
            for layer_idx in range(len(self.weights)):
                self.weights[layer_idx] -= grad_weights[layer_idx] * self.lr
                self.biases[layer_idx] -= grad_biases[layer_idx] * self.lr
            
            if (iteration + 1) % (self.max_iters // 10) == 0:
                print(prev_loss)
            curr_loss = self.calc_loss(X, y)
            if curr_loss < prev_loss - self.eps:
                prev_loss = curr_loss
            else:
                break
              
        return self

    def predict(self, X):
        Zs = [X.copy()]
        As = [X.copy()]
        L = len(self.weights)
        for layer_idx in range(L - 1):
            Z = As[-1].copy() @ self.weights[layer_idx].T + self.biases[layer_idx]
            Zs.append(Z)
            As.append(self.act_fn(Z))
            
        y_pred = As[-1] @ self.weights[-1].T + self.biases[-1]
        return y_pred

    def calc_loss(self, X, y):
        y_pred = self.predict(X)
        return ((y - y_pred) ** 2).mean()
        
    def calc_grad(self, X, y):
        Zs = [X.copy()]
        As = [X.copy()]
        L = len(self.weights)
        for layer_idx in range(L - 1):
            Z = As[-1].copy() @ self.weights[layer_idx].T + self.biases[layer_idx]
            Zs.append(Z)
            As.append(self.act_fn(Z))
        y_pred = As[-1] @ self.weights[-1].T + self.biases[-1]
        self.Zs = Zs
        self.As = As
        
        E = (y_pred - y)
        dL_dWlast = 2 * einsum(As[-1], E, 'm a, m b -> m a b').mean(axis=0)
        dL_dblast = 2 * E.mean(axis=0).sum()
        gradW_list = []
        gradb_list = []
        gradW_list.append(dL_dWlast.T)
        gradb_list.append(dL_dblast)
        leading_term = self.weights[-1].T[None].repeat(M, axis=0)
        L = len(self.weights)
        for layer_idx in range(L - 1, 0, -1):
          Z = Zs[layer_idx]
          A = As[layer_idx]
          Z_prev = Zs[layer_idx - 1]
          A_prev = As[layer_idx - 1]
          dA_dW = einsum(self.grad_fn(Z), A_prev, 'm b, m a -> m a b')
          dE_dW = einsum(leading_term, dA_dW, 'm b c, m a b -> m a b c')
          dL_dW = 2 * einsum(dE_dW, E, 'm a b c, m c -> m a b c').mean(axis=0).mean(axis=-1)
          gradW = dL_dW.T
          gradW_list.append(gradW)
          
          dA_db = self.grad_fn(Z)
          dE_db = einsum(leading_term, dA_db, 'm b c, m b -> m b c')
          dL_db = 2 * einsum(dE_db, E, 'm b c, m c -> m b c').mean(axis=0).sum()
          gradb = dL_db
          gradb_list.append(gradb)

          if layer_idx > 1:
            dA_dA = einsum(self.grad_fn(Z), self.weights[layer_idx - 1].T, 'm b, a b -> m a b')
            leading_term = einsum(leading_term, dA_dA, 'm b c, m a b -> m a c')
    
        return gradW_list[::-1], gradb_list[::-1]

In [4]:
mlp = MLPRegressor(hidden_layer_sizes=(100, 64), max_iters=100, lr=1e-2).fit(X, y)
mlp.calc_loss(X, y)

0.2960851363444067
0.23675113324494682
0.20076168018480675
0.17651492814149097
0.159126974756254
0.14610521642505178
0.136028142647592
0.12799087408862556
0.12140896949094176
0.11592111980319915


0.11542439570657036