In [1]:
from numpy import ndarray
from typing import List
import numpy as np
from time import time

In [3]:
class Operation(object):
    # nothing to initialize
    def __init__(self):
        pass
    
    #forward function receives an ndarray as input
    def forward(self, input_:ndarray) -> ndarray:
        self.input_ = input_
        
        self.output = self._output()
        
        return self.output
    
    def backward(self, output_grad):
        assert self.output.shape == output_grad.shape
        
        self.input_grad = self._input_grad(output_grad)
        
        assert self.input_.shape == self.input_grad.shape
        
        return self.input_grad
    
    def _output(self):
        raise NotImplementError()
        
    def _input_grad(self, output_grad):
        raise NotImplementError()
        
        

In [4]:
class ParamOperation(Operation):
    def __init__(self, param: ndarray):
        super().__init__() #inherit from parent if any
        self.param = param #this will be used in _output
        
    def backward(self, output_grad: ndarray) -> ndarray:
        #make sure output and output_grad has same shape
        assert self.output.shape == output_grad.shape
        
        #perform gradients for both input and param
        self.input_grad = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)
        
        assert self.input_.shape == self.input_grad.shape
        assert self.param.shape == self.param_grad.shape
        
        return self.input_grad
    
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        raise NotImplementedError()

In [22]:
class WeightMultiply(ParamOperation):
    def __init__(self, W: ndarray):
        super().__init__(W)
    
    def _output(self):
        return self.input_ @ self.param
    
    def _input_grad(self,output_grad):
        #this is because dOut/dX = W.T and since the equation is X@W, W.T is post multiplied
        return output_grad @ self.param.T
    
    def _param_grad(self, output_grad):
        #since dOut/dW = X.T and since the equation is X@W, X.T is pre multiplied
        return self.input_.T @ output_grad

In [29]:
class BiasAdd(ParamOperation):
    def __init__(self, B):
        assert B.shape[0] == 1
        super().__init__(B)
        
    def _output(self):
        return self.input_ + self.param
    
    def _input_grad(self, output_grad):
        return np.ones_like(self.input_) * output_grad
    
    def _param_grad(self, output_grad):
        param_grad = np.ones_like(self.param)*output_grad
        return np.sum(param_grad,axis=0).reshape(1,param_grad.shape[1])

In [30]:
class Sigmoid(Operation):
    def __init__(self):
        super().__init__()
        
    def _output(self):
        return 1.0/(1.0+np.exp(-1.0*self.input_))
    
    def _input_grad(self, output_grad):
        sigmoid_backward = self.output * (1.0 - self.output)
        input_grad = sigmoid_backward * output_grad
        return input_grad
    

In [32]:
class Linear(Operation):
    def __init__(self):
        super().__init__()
    
    def _output(self):
        return self.input_
    
    def _input_grad(self, output_grad):
        return output_grad

In [33]:
class Layer(object):
    def __init__(self, neurons):
        self.neurons = neurons
        self.first = True
        self.parmas = []
        self.param_grads = []
        self.operations = []
        
    #setup layer is to initialize stuff such as the parameter values, etc
    def _setup_layer(self,num_in):
        raise NotImplementedError()
        
    def forward(self, input_):
        if self.first:
            self._setup_layer(input_)
            self.first = False
        
        self.input_ = input_
        
        for operation in self.operations:
            input_ = operation.forward(input_)
            
        self.output = input_
        
        return self.output
    
    def backward(self, output_grad):
        assert self.output.shape == output_grad.shape
        
        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)
            
        input_grad = output_grad
        
        self._param_grads()
        
        return input_grad
    
    def _param_grads(self):
        self.param_grads = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.param_grads.append(operation.param_grad)
    
    def _params(self):
        self.params = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.params.append(operation.param)

In [35]:
class Dense(Layer):
    def __init__(self,neurons: int, activation: Operation = Sigmoid()):
        super().__init__(neurons)
        self.activation = activation
    
    def _setup_layer(self,input_: ndarray):
        if self.seed:
            np.random.seed(self.seed)
        
        self.params = []
    
        self.params.append(np.random.randn(input_.shape[1], self.neurons))
        
        self.params.append(np.random.randn(1, self.neurons))
        
        self.operations = [WeightMultiply(self.params[0]),BiasAdd(self.params[1]),self.activation]
        

In [36]:
class Loss(object):
    
    def __init__(self):
        pass
    
    def forward(self, prediction: ndarray, target: ndarray) -> float:
        assert prediction.shape == target.shape
        
        self.prediction = prediction 
        self.target = target
        
        loss_value = self._output()
        
        return loss_value
    
    def backward(self) -> ndarray:
        
        self.input_grad = self._input_grad()
        
        assert self.prediction.shape == self.input_grad.shape
        
        return self.input_grad
    
    def _output(self) -> float:
        raise NotImplementedError()
        
    def _input_grad(self) -> ndarray:
        raise NotImplementedError()
        

In [37]:
class MeanSquaredError(Loss):
    def __init__(self):
        super().__init__()
        
    def _output(self) -> float:
        loss = (np.sum(np.power(self.prediction - self.target, 2)) / self.prediction.shape[0])
            
        return loss
    
    def _input_grad(self) -> ndarray:
        return 2.0 * (self.prediction - self.target) / self.prediction.shape[0]

In [42]:
class NeuralNetwork(object):
    def __init__(self, layers: List[Layer], loss: Loss, seed: int = 1):
        self.layers = layers 
        self.loss = loss
        self.seed = seed
        if seed:
            for layer in self.layers:
                setattr(layer, "seed", self.seed)
                
    def forward(self, x_batch):
        x_out = x_batch
        for layer in self.layers:
            x_out = layer.forward(x_out)
            
        return x_out
    
    def backward(self, loss_grad: ndarray):
        grad = loss_grad
        for layer in reversed(self.layers):
            grad = layer.backward(grad)

    def train_batch(self, x_batch: ndarray, y_batch:ndarray) -> float:
        predictions = self.forward(x_batch)
        loss = self.loss.forward(predictions, y_batch)
        self.backward(self.loss.backward())
        
        return loss
    
    def params(self):
        for layer in self.layers:
            yield from layer.params
            
    def param_grads(self):
        for layer in self.layers:
            yield from layer.param_grads

In [43]:
lr = NeuralNetwork(layers=[Dense(neurons=1,activation=Linear())], 
                   loss=MeanSquaredError(), 
                   seed = 20200720)

nn = NeuralNetwork(layers=[Dense(neurons=13, activation=Sigmoid()),
                           Dense(neurons=1,activation=Linear())], 
                   loss=MeanSquaredError(), 
                   seed = 20200720)

d1 = NeuralNetwork(layers=[Dense(neurons=13, activation=Sigmoid()),
                           Dense(neurons=13,activation=Sigmoid()),
                           Dense(neurons=1,activation=Linear())],
                   loss=MeanSquaredError(), 
                   seed=20200720)

In [44]:
class Optimizer(object):
    def __init__(self, lr: float = 0.01):
        self.lr = lr
        
    def step(self):
        pass

In [45]:
class SGD(Optimizer):
    def __init__(self, lr: float = 0.01):
        super().__init__(lr)
        
    def step(self):
        for (param, param_grad) in zip(self.net.params(), self.net.param_grads()):
            param -= self.lr * param_grad

In [48]:
from copy import deepcopy
from typing import Tuple

class Trainer(object):
    def __init__(self,net:NeuralNetowrk, optim:Optimizer):
        self.net = net
        self.optim = optim
        self.best_loss = 1e9
        
        setattr(self.optim, 'net', self.net)
    
    def permute_data(self,X,y):
        perm = np.random.permutation(X.shape[0])
        return X[perm], y[perm]
    
    def generate_batches(self, X:ndarray, y:ndarray, size: int = 32) -> Tuple[ndarray]:
        assert X.shape[0] == y.shape[0]
        
        N = X.shape[0]
        
        for i in range(0, N, size):
            X_batch, y_batch = X[i:i+size], y[i:i+size]
            yield X_batch, y_batch
            
    def fit(self, X_train: ndarray, y_train: ndarray,
           X_test: ndarray, y_test: ndarray,
           epochs: int = 100,
           eval_every: int = 10,
           batch_size: int = 32,
           seed: int = 1, 
           restart: bool = True):
        np.random.seed(seed)
        
        if restart:
            for layer in self.net.layers:
                layer.first = True
                
            self.best_loss = 1e9
            
        for e in range(epochs):
            if (e+1) % eval_every == 0:
                last_model = deepcopy(self.net)
                
            X_train, y_train = self.permute_data(X_train, y_train)
            
            batch_generator = self.generate_batches(X_train, y_train, batch_size)
            
            for (X_batch, y_batch) in batch_generator:
                self.net.train_batch(X_batch, y_batch)
                
                self.optim.step()
                
            if (e+1) % eval_every== 0:
                test_preds = self.net.forward(X_test)
                loss = self.net.loss.forward(test_preds, y_test)
                
                if loss < self.best_loss:
                    print(f"Validation loss after {e+1} epochs is {loss:.3f}")
                    self.best_loss = loss
                else:
                    print(f"Loss increased after epoch {e+1}")
                    self.net = last_model 
                    setattr(self.optim, 'net', self.net)
                    break;

In [53]:
from sklearn.datasets import load_boston 
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

boston = load_boston()
X = boston.data
y = boston.target
features = boston.feature_names
s = StandardScaler()
X = s.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.3, random_state=42)

y_train, y_test = y_train.reshape(-1,1), y_test.reshape(-1,1)

In [54]:
trainer = Trainer(lr, SGD(lr=0.01))

trainer.fit(X_train, y_train, X_test, y_test, epochs = 50, eval_every = 10, seed = 20200720)


Validation loss after 10 epochs is 26.549
Validation loss after 20 epochs is 24.722
Validation loss after 30 epochs is 22.859
Validation loss after 40 epochs is 22.766
Validation loss after 50 epochs is 22.648


In [55]:
from sklearn.metrics import mean_squared_error

preds = lr.forward(X_test)
mean_squared_error(y_test, preds)

22.647618498359957

In [56]:
trainer = Trainer(nn, SGD(lr = 0.01))

trainer.fit(X_train, y_train, X_test, y_test, epochs = 50, eval_every = 10, seed = 20200720)

Validation loss after 10 epochs is 23.608
Validation loss after 20 epochs is 21.098
Validation loss after 30 epochs is 15.430
Validation loss after 40 epochs is 13.955
Validation loss after 50 epochs is 13.393


In [57]:
preds = nn.forward(X_test)
print("NN MSE: ", mean_squared_error(y_test,preds))

NN MSE:  13.392638894503875


In [58]:
trainer = Trainer(d1, SGD(lr = 0.01))

trainer.fit(X_train, y_train, X_test, y_test, epochs = 50, eval_every = 10, seed = 20200720)

Validation loss after 10 epochs is 31.474
Validation loss after 20 epochs is 19.227
Validation loss after 30 epochs is 15.901
Validation loss after 40 epochs is 14.381
Validation loss after 50 epochs is 12.854


In [59]:
preds = d1.forward(X_test)
print("DL MSE: ", mean_squared_error(y_test, preds))

DL MSE:  12.853543518253922
