In [53]:
import numpy as np
from typing import Callable
import abc
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

In [2]:
class Optimizer(abc.ABC):
    @abc.abstractmethod
    def __init__(self):
        pass
    @abc.abstractmethod
    def optimize(self):
        pass

In [3]:
class GradientDescentOptimizer(Optimizer):
    def __init__(self, learning_rate: float = 0.01):
        self.learning_rate = learning_rate
    
    def optimize(self, val, grad):
        return val - self.learning_rate * grad

In [4]:
class ActivationFunction(abc.ABC):
    @abc.abstractmethod
    def __call__(self, x):
        pass
    @abc.abstractmethod
    def gradient(self, x):
        pass

In [5]:
class Sigmoid(ActivationFunction):
    def __call__(self, x):
        return 1 / (1 + np.exp(-x))
    
    def gradient(self, x):
        sig = self.__call__(x)
        return sig * (1 - sig)

In [6]:
class ReLU(ActivationFunction):
    def __call__(self, x):
        return np.where(x >= 0, x, 0)
    
    def gradient(self, x):
        return np.where(x >= 0, 1, 0)

In [7]:
class Initializer(abc.ABC):
    @abc.abstractmethod
    def __call__(self, var):
        pass

In [8]:
class UniformInitializer(Initializer):
    def __call__(self, mean, var, shape):
        stddev = np.sqrt(var)
        lim = np.sqrt(3) * stddev
        
        return np.random.uniform(-lim, lim, shape)

In [9]:
class Layer(abc.ABC):
    @abc.abstractmethod
    def __init__(self):
        pass
    
    @abc.abstractmethod
    def set_optimizer(self, optimizer):
        pass
    
    @abc.abstractmethod
    def forward_pass(self, X):
        pass
    
    @abc.abstractmethod
    def backward_pass(self, X):
        pass
    
    @abc.abstractmethod
    def output_shape(self):
        pass

In [10]:
class Dense(Layer):
    def __init__(self, n_units, input_shape):
        self.n_units = n_units
        
        if len(input_shape) != 2:
            raise Exception('Input shape other than 2 not supported.')
        
        self.input_shape = input_shape
    
    def output_shape(self):
        return (self.input_shape[0], self.n_units)
    
    def set_activation(self, activation):
        if not isinstance(activation, Activation):
            raise Exception('The activation object provided is not an instance of Activation class.')
        
        self.activation = activation
    
    def initialize(self, initializer):
        if not isinstance(initializer, Initializer):
            raise Exception('The initializer object provided is not an instance of Initializer class.')
        
        self.initializer = initializer
        
        self.W = self.initializer(mean=0, var=1/self.n_units, shape=(self.input_shape[1], self.n_units))
        self.b = self.initializer(mean=0, var=1/self.n_units, shape=(1, self.n_units))
    
    def set_optimizer(self, optimizer):
        
        if not isinstance(optimizer, Optimizer):
            raise Exception('The optimizer object provided is not an instance of Optimizer class.')
        
        self.optimizer = optimizer
    
    def forward_pass(self, X):
        return X.dot(self.W) + self.b
    
    def backward_pass(self, cum_grad, X):
        grad_W = X.T.dot(cum_grad)
        grad_b = np.sum(cum_grad, axis=0)
        
        self.W = self.optimizer.optimize(self.W, grad_W)
        self.b = self.optimizer.optimize(self.b, grad_b)
        
        return cum_grad.dot(self.W.T)

In [11]:
dense_layer = Dense(n_units=5, input_shape=(3, 12))

In [12]:
dense_layer.output_shape()

(3, 5)

In [13]:
dense_layer.initialize(UniformInitializer())

In [14]:
dense_layer.set_optimizer(GradientDescentOptimizer())

In [15]:
dense_layer.W

array([[ 0.28790194, -0.56390246, -0.12119454, -0.65586433,  0.5268497 ],
       [ 0.10501284, -0.02428236,  0.10796078,  0.48601642, -0.3941557 ],
       [-0.64008093, -0.44022788, -0.49671487,  0.35091914,  0.35538506],
       [-0.29073183, -0.48654303,  0.11626326,  0.06656025, -0.14395966],
       [-0.44386857, -0.46222169, -0.75541802,  0.27534252,  0.18249313],
       [ 0.06081766,  0.70426789, -0.19652398, -0.03197299,  0.27281891],
       [-0.08871465,  0.02247918, -0.36850942, -0.73376416,  0.04873917],
       [-0.43981468,  0.75404555, -0.66764092,  0.40116832,  0.65169349],
       [-0.52365192, -0.47923632, -0.49422007,  0.64281943,  0.71394737],
       [-0.74755722,  0.61740316,  0.25870522, -0.5180662 , -0.13991368],
       [ 0.02254485, -0.47263025,  0.22032634,  0.05873256, -0.2216663 ],
       [ 0.72590164, -0.29757616,  0.05550272,  0.22078828,  0.35942142]])

In [16]:
dense_layer.backward_pass(cum_grad=np.ones((3, 5)), X=np.ones((3, 12)))

array([[-0.67620969,  0.13055199, -1.02071947, -0.888411  , -1.35367263,
         0.65940749, -1.26976986,  0.54945177, -0.29034151, -0.67942871,
        -0.5426928 ,  0.9140379 ],
       [-0.67620969,  0.13055199, -1.02071947, -0.888411  , -1.35367263,
         0.65940749, -1.26976986,  0.54945177, -0.29034151, -0.67942871,
        -0.5426928 ,  0.9140379 ],
       [-0.67620969,  0.13055199, -1.02071947, -0.888411  , -1.35367263,
         0.65940749, -1.26976986,  0.54945177, -0.29034151, -0.67942871,
        -0.5426928 ,  0.9140379 ]])

In [17]:
dense_layer.W

array([[ 0.25790194, -0.59390246, -0.15119454, -0.68586433,  0.4968497 ],
       [ 0.07501284, -0.05428236,  0.07796078,  0.45601642, -0.4241557 ],
       [-0.67008093, -0.47022788, -0.52671487,  0.32091914,  0.32538506],
       [-0.32073183, -0.51654303,  0.08626326,  0.03656025, -0.17395966],
       [-0.47386857, -0.49222169, -0.78541802,  0.24534252,  0.15249313],
       [ 0.03081766,  0.67426789, -0.22652398, -0.06197299,  0.24281891],
       [-0.11871465, -0.00752082, -0.39850942, -0.76376416,  0.01873917],
       [-0.46981468,  0.72404555, -0.69764092,  0.37116832,  0.62169349],
       [-0.55365192, -0.50923632, -0.52422007,  0.61281943,  0.68394737],
       [-0.77755722,  0.58740316,  0.22870522, -0.5480662 , -0.16991368],
       [-0.00745515, -0.50263025,  0.19032634,  0.02873256, -0.2516663 ],
       [ 0.69590164, -0.32757616,  0.02550272,  0.19078828,  0.32942142]])

In [18]:
class LossFunction(abc.ABC):
    @abc.abstractmethod
    def __call__(self):
        pass
    @abc.abstractmethod
    def gradient(self):
        pass

In [19]:
class BinaryCrossEntropyLoss(LossFunction):
    def __call__(self, y, y_pred):
        return -np.sum(y * np.log(y_pred) + (1 - y) * np.log(1 - y_pred), axis=0)
    
    def gradient(self, y, y_pred):
        return (y_pred - y) / (y_pred - y_pred * y_pred)

In [20]:
bce_loss = BinaryCrossEntropyLoss()
bce_loss(np.array([[1, 0, 0, 0]]).T, np.array([[0.9, 0.1, 0.1, 0.1]]).T)

array([0.42144206])

In [21]:
bce_loss(np.array([[1, 0, 0, 0]]).T, np.array([[0.1, 0.9, 0.9, 0.9]]).T)

array([9.21034037])

In [22]:
bce_loss.gradient(np.array([[1, 0, 0, 0]]).T, np.array([[0.99, 0.01, 0.99, 0.9]]).T)

array([[ -1.01010101],
       [  1.01010101],
       [100.        ],
       [ 10.        ]])

In [23]:
class Model(abc.ABC):
    @abc.abstractmethod
    def __init__(self, components=None):
        pass
    @abc.abstractmethod
    def set_optimizer(self, optimizer):
        pass
    @abc.abstractmethod
    def set_loss_function(self, loss_function):
        pass
    @abc.abstractmethod
    def predict_proba(self, X):
        pass
    @abc.abstractmethod
    def predict(self, X, thresh=None):
        pass

In [47]:
class Sequential(Model):
    def __init__(self, components=None):
        self.components = []
        self.loss_function = None
        
        for component in components:
            self.components.append(component)
    
    def set_optimizer(self, optimizer):
        for component in self.components:
            if isinstance(component, Layer):
                component.set_optimizer(optimizer)
    
    def set_loss_function(self, loss_function):
        self.loss_function = loss_function
    
    def initialize(self, initializer):
        
        for component in self.components:
            if isinstance(component, Layer):
                curr = component.initialize(initializer)
    
    def forward_pass(self, X):
        cache = []
        curr = X
        
        for component in self.components:
            cache.append(curr)
            if isinstance(component, Layer):
                curr = component.forward_pass(curr)
            else:
                curr = component(curr)
        
        return curr, cache
    
    def backward_pass(self, loss_grad, cache):
        cum_grad = loss_grad
        
        for component, inp in zip(reversed(self.components), reversed(cache)):
            if isinstance(component, Layer):
                cum_grad = component.backward_pass(cum_grad, inp)
            else:
                cum_grad = component.gradient(inp) * cum_grad
    
    def predict_proba(self, X):
        y_pred, _ = self.forward_pass(X)
        return y_pred
    
    def predict(self, X, thresh=.5):
        y_pred = self.predict_proba(X)
        return np.where(y_pred > thresh, 1, 0)

In [62]:
model = Sequential([Dense(n_units=10, input_shape=(150, 4)),
                    ReLU(),
                    Dense(n_units=1, input_shape=(10, 10)),
                    Sigmoid()])
model.initialize(UniformInitializer())
model.set_loss_function(BinaryCrossEntropyLoss())
model.set_optimizer(GradientDescentOptimizer(learning_rate=0.001))

In [63]:
iris_data = load_iris()
X = iris_data['data'] # Features
y = np.where(iris_data['target'] < 0.5, 0, 1) # Target classes

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

In [64]:
y_train = y_train[:, np.newaxis]

for i in range(50):
    y_pred, cache = model.forward_pass(X_train)
    loss = model.loss_function(y_train, y_pred)
    print('At iteration', i, 'the loss is', loss[0])
    grad = model.loss_function.gradient(y_train, y_pred)
    model.backward_pass(grad, cache)

At iteration 0 the loss is 181.84640475789402
At iteration 1 the loss is 99.35290507937226
At iteration 2 the loss is 67.36270293863662
At iteration 3 the loss is 45.74474588693522
At iteration 4 the loss is 34.986572546050915
At iteration 5 the loss is 29.21473592990995
At iteration 6 the loss is 24.822994649684855
At iteration 7 the loss is 20.88756961130935
At iteration 8 the loss is 17.457589417148807
At iteration 9 the loss is 14.582764586563307
At iteration 10 the loss is 12.248199507501996
At iteration 11 the loss is 10.383144817643197
At iteration 12 the loss is 8.899932158553296
At iteration 13 the loss is 7.795116754852851
At iteration 14 the loss is 6.983365292033278
At iteration 15 the loss is 6.332974122305083
At iteration 16 the loss is 5.831318181855369
At iteration 17 the loss is 5.436541202091599
At iteration 18 the loss is 5.11194165657544
At iteration 19 the loss is 4.832185504643837
At iteration 20 the loss is 4.585929082021753
At iteration 21 the loss is 4.37428654

In [65]:
y_pred = model.predict(X_test)

In [66]:
from sklearn.metrics import accuracy_score
accuracy_score(y_test, y_pred)

1.0

In [67]:
roc_auc_score(y_test, y_pred)

1.0