In [65]:
import numpy as np
from numpy import ndarray
from abc import ABC, abstractmethod

In [66]:
def assert_same_shape(array: ndarray, array_grad: ndarray):
    assert array.shape == array_grad.shape, \
    f'''
    Two ndarrays should have the same shape;
    instead, first ndarray's shape is {array.shape}
    and second ndarray's shape is {array_grad.shape}
    '''
    return None

<h1> <code>Operation</code> and <code>ParamOperation</code></h1>

In [67]:
class Operation(ABC):
    '''
    Base class for "operation" in neural network
    '''
    def __init__(self):
        pass
    
    
    def forward(self, input_: ndarray) -> ndarray:
        '''
        Stores input in the instance variable self.input_
        Calls the self._output() method
        '''
        self.input_: ndarray = input_
            
        self.output = self._output()
        
        return self.output
    
    
    def backward(self, output_grad: ndarray) -> ndarray:
        '''
        Calls the self._input_grad() function.
        Checks that the appropriate shapes match
        '''
        assert_same_shape(self.ouput, output_grad)
        
        self.input_grad = self._input_grad(ouput_grad)
        
        assert_same_shape(self.input_, self.input_grad)
        
        return self.input_grad
    
    
    @abstractmethod
    def _output(self) -> ndarray:
        '''
        The _output method must be defined for each Operation
        '''
        pass
    
    
    @abstractmethod
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        '''
        The _input_grad method must be defined for each Operation
        '''
        pass

In [68]:
class ParamOperation(Operation):
    '''
    An operation with parameters.
    '''
    
    def __init__(self, param: ndarray) -> ndarray:
        '''
        The ParamOperation method
        '''
        super().__init__()
        self.param = param
        
        
    def backward(self, output_grad: ndarray) -> ndarray:
        '''
        Calls the self._input_grad and self._param_grad
        '''
        assert_same_shape(self.output, output_grad)
        
        self.input_grad = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)
        
        assert_same_shape(self.input_, self.input_grad)
        assert_same_shape(self.param, self.param_grad)
        
        return self.input_grad
    
    
    @abstractmethod
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        '''
        Every subclass of ParamOperation must implement _param_grad.
        '''
        pass

# Specific Operations

In [69]:
class WeightMultiply(ParamOperation):
    '''
    Weight multiplication operation for a neural network
    '''
    
    def __init__(self, W: ndarray):
        '''
        Initialize Operation with self.param = W
        '''
        super().__init__(W)
        
    
    def _output(self) -> ndarray:
        '''
        Computes the output
        '''
        return np.dot(self.input_, self.param)
    
    
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        '''
        Computes input gradient
        '''
        return np.dot(output_grad, np.transpose(self.param, (1, 0)))
    
    
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        '''
        Computes parameter gradient
        '''
        return np.dot(np.transpose(self.input_, (1, 0)), output_grad)

In [75]:
class BiasAdd(ParamOperation):
    '''
    Compute Bias Addition
    '''
    
    def __init__(self, B: ndarray):
        '''
        Initialize Operation with self.param = B
        Check appropriate shape.
        '''
        assert B.shape[0] == 1
        
        super().__init__(B)
        
    
    def _output(self) -> ndarray:
        '''
        Computes the output
        '''
        return self.input_ + self.param
    
    
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        '''
        Computes input gradient
        '''
        return np.ones_like(sel.input_) * output_grad
    
    
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        '''
        Computes parameter gradient
        '''
        param_grad = np.ones_like(self.param) * output_grad
        return np.sum(param_grad, axis=0).reshape(1, param_grad.shape[1])

In [76]:
class Sigmoid(Operation):
    '''
    Identify activation function
    '''
    
    def __init__(self):
        super().__init__()
        
    def _output(self): 
        return 1.0 / (1.0 + np.exp(-1.0 * self.input_))
    
    
    def _input_grad(self, output_grad: ndarray):
        '''
        Computes input gradient
        '''
        sigmoid_backward = self.output * (1.0 - self.output)
        input_grad = sigmoid_backward * output_grad
        return input_grad

In [77]:
class Linear(Operation):
    '''
    Identify activation function
    '''
    
    def __init__(self):
        super().__init__()
        
        
    def _output(self): 
        return self.input_
    
    
    def _input_grad(self, output_grad: ndarray):
        '''
        Computes input gradient
        '''
        return output_grad

# Layer

In [79]:
class Layer(ABC):
    '''
    A "layer" of neurons in a neural network.
    '''
    
    def __init__(self, neurons: int):
        '''
        The number of neurons roughly corresponds to the breadth of the layer
        '''
        self.neurons = neurons
        self.first = True
        self.params: list[ndarray] = list()
        self.param_grads: list[ndarray] = list()
        self.operations: list[Operation] = list()
            
            
    @abstractmethod      
    def _setup_layer(self, num_in: int) -> None:
        '''
        The _setup_layer function must be implemented for each layer
        '''
        pass
    
    
    def forward(self, input_: ndarray) -> ndarray:
        '''
        Passes input forward through a series of operations.
        '''
        if se.first:
            self._setup_layer(input_)
            self.first = False
            
        self.input_ = input_
        
        for operation in self.operations:
            input_ = operation.forward(input_)
            
        self.output = input_
            
        return self.output
    
    
    def backward(self, output_grad: ndarray) -> ndarray:
        '''
        Passes output_grad backward through a series of operations.
        Checks appropriate shapes.
        '''
        assert_same_shape(self.output, output_grad)
        
        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)
            
        input_grad = output_grad
        
        self._param_grads()
        
        return input_grad
    
    
    def _param_grads(self) -> ndarray:
        '''
        Extracts the _param_grads from a layer's operations.
        '''
        
        self.param_grads = [
            operation.param_grad 
            for operation in self.operations 
            if issubclass(operation.__class__, ParamOperation)
        ]
                
    
    def _params(self) -> ndarray:
        '''
        Extract the _params from a layer's operations.
        '''     
        self.params = [
            operation.param 
            for operation in self.operations 
            if issubclass(operation.__class__, ParamOperation)
        ]

In [80]:
class Dense(Layer):
    '''
    A fully connected layer that inherits from "Layer".
    '''
    def __init__(self, neurons: int, activation: Operation = Sigmoid()) -> None:
        '''
        Requires an activation function upon inirialization.
        '''
        super().__init__(neurons)
        self.activation = activation
        
    
    def _setup_layer(self, input_: ndarray) -> None:
        '''
        Defines the operations of a fully connected layer.
        '''
        if self.seed: 
            np.random.seed(self.seed)
            
        self.params = []
        
        # weights
        self.params.append(np.random.randn(input_.shape[1], self.neurons))
        
        # bias
        self.params.append(np.random.randn(1, self.neurons))
        
        self.operations = [
            WeightMultiply(self.params[0]),
            BiasAdd(self.params[1]),
            self.activation
        ]
        
        return None

# Loss and MeanSquaredError

In [82]:
class Loss(ABC):
    '''
    The "Loss" of a neural network
    '''
    def __init__(self):
        pass
    
    def forward(self, prediction: ndarray, target: ndarray) -> float:
        '''
        Computes the actual loss value
        '''
        assert_same_shape(prediction, target)
        
        self.prediction = prediction
        self.target = target
        
        loss_value = self._output()
        
        return loss_value
    
    def backward(self) -> ndarray:
        '''
        Computes the gradient of the loss value wrt the input to the loss function
        '''
        self.input_grad = self._input_grad()
        
        assert_same_shape(self.prediction, self.input_grad)
        
        return self.input_grad
        
        
    
    @abstractmethod
    def _output(self) -> float:
        pass
    
    
    @abstractmethod
    def _input_grad(self) -> ndarray:
        pass
    
    

In [83]:
class MeanSquaredError(Loss):
    
    def __init__(self):
        super().__init__()
        
    
    def _output(self) -> float:
        loss = np.sum(np.power(self.prediction - self.target, 2)) / self.prediction.shape[0]
        
        return loss
    
    def _input_grad(self) -> ndarray:
        return 2.0 * (self.prediction - self.target) / self.prediction.shape[0]

# NeuralNetwork

In [84]:
class NeuralNetwork(ABC):
    def __init__(self, layers: list[Layer], loss: Loss, seed: int = 1) -> None:
        self.layers = layers
        self.loss = loss
        self.seed = seed
        if seed:
            for layer in self.layers:
                setattr(layer, "seed", self.seed)
                
    def forward(self, x_batch: ndarray) -> ndarray:
        x_out = x_batch
        
        for layer in self.layers:
            x_out = layer.forward(x_out)
            
        return x_out
    
    def backward(self, loss_grad: ndarray) -> None:
        grad = loss_grad
        for layer in reversed(self.layers):
            grad = layer.backward(grad)
            
        return None
    
    def train_batch(self, x_batch, y_batch): 
        predictions = self.forward(x_batch)
        loss = self.loss.forward(predictions, y_batch)
        self.backward(self.loss.backward())
        
        return loss
    
    def param(self):
        for layer in self.layers:
            yield from layer.params
    
    
    def param_grads(self):
        for layer in self.layers:
            yield from layer.param_grads
            

# Optimizer and SGD

In [85]:
class Optimizer(ABC):
    def __init__(self, lr: float=0.01):
        self.lr = lr
    
    @abstractmethod
    def step(self): 
        pass

In [86]:
class SGD(Optimizer):
    def __init__(self, lr: float=0.01):
        super().__init__(lr)
        
    def step(self):
        for (param, param_grad) in zip(self.net.params(), self.net.param_grads()):
            param -= self.lr * param_grad

# Trainer

In [87]:
from copy import deepcopy

class Trainer(ABC):
    def __init__(self, net: NeuralNetwork, optim: Optimizer) -> None:
        self.net = net
        self.optim = optim
        self.best_loss = 1e9
        setattr(self.optim, 'net', self.net)
        
    def generate_batches(self, X: ndarray, y: ndarray, size: int = 32) -> tuple[ndarray]:
        assert X.shape[0] == y.shape[0], \
        f'''
        features and target must have the same number of rows, instead
        features has {X.shape[0]} and target has {y.shape[0]}
        '''
        
        N = X.shape[0]
        
        for ii in range(0, N, size):
            X_batch, y_batch = X[ii:ii+size], y[ii:ii+size]
            
            yield X_batch, y_batch
            
    def fit(self, X_train, y_train,
            X_test, y_test,
            epochs: int=100,
            eval_every: int=10,
            batch_size: int=32, 
            seed: int = 1, 
            restart: bool = True) -> None:
        
        np.random.seed(seed)
        
        if restart:
            for layer in self.net.layers:
                layer.first = True
                
            self.best_loss = 1e9
            
        for e in range(epochs):
            
            if (e+1) % eval_every == 0:
                last_model = deepcopy(self.net)
                
            X_train, y_train = permute_data(X_train, y_train)
            
            batch_generator = self.generate_batches(X_train, y_train, batch_size)
            
            for ii, (X_batch, y_batch) in enumerate(batch_generator):

                self.net.train_batch(X_batch, y_batch)

                self.optim.step()
                
            if (e+1) % eval_every == 0:

                test_preds = self.net.forward(X_test)
                loss = self.net.loss.forward(test_preds, y_test)

                if loss < self.best_loss:
                    print(f"Validation loss after {e+1} epochs is {loss:.3f}")
                    self.best_loss = loss
                else:
                    print(f"""Loss increased after epoch {e+1}, final loss was {self.best_loss:.3f}, using the model from epoch {e+1-eval_every}""")
                    self.net = last_model
                    # ensure self.optim is still updating self.net
                    setattr(self.optim, 'net', self.net)
                    break
            
        

In [88]:
def mae(y_true: ndarray, y_pred: ndarray):
    '''
    Compute mean absolute error for a neural network.
    '''    
    return np.mean(np.abs(y_true - y_pred))

def rmse(y_true: ndarray, y_pred: ndarray):
    '''
    Compute root mean squared error for a neural network.
    '''
    return np.sqrt(np.mean(np.power(y_true - y_pred, 2)))

def eval_regression_model(model: NeuralNetwork,
                          X_test: ndarray,
                          y_test: ndarray):
    '''
    Compute mae and rmse for a neural network.
    '''
    preds = model.forward(X_test)
    preds = preds.reshape(-1, 1)
    print("Mean absolute error: {:.2f}".format(mae(preds, y_test)))
    print()
    print("Root mean squared error {:.2f}".format(rmse(preds, y_test)))

In [89]:
lr = NeuralNetwork(
    layers=[Dense(neurons=1,
                   activation=Linear())],
    loss=MeanSquaredError(),
    seed=20190501
)

nn = NeuralNetwork(
    layers=[Dense(neurons=13,
                   activation=Sigmoid()),
            Dense(neurons=1,
                   activation=Linear())],
    loss=MeanSquaredError(),
    seed=20190501
)

dl = NeuralNetwork(
    layers=[Dense(neurons=13,
                   activation=Sigmoid()),
            Dense(neurons=13,
                   activation=Sigmoid()),
            Dense(neurons=1,
                   activation=Linear())],
    loss=MeanSquaredError(),
    seed=20190501
)

In [90]:
import pandas as pd

data_url = "http://lib.stat.cmu.edu/datasets/boston"
raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)
data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
target = raw_df.values[1::2, 2]

In [92]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
s = StandardScaler()
data = s.fit_transform(data)

In [93]:
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.3, random_state=80718)

y_train, y_test = y_train.reshape(-1, 1), y_test.reshape(-1, 1)

In [94]:
# helper function

def permute_data(X, y):
    perm = np.random.permutation(X.shape[0])
    return X[perm], y[perm]

In [None]:
trainer = 