## Mar 15, 2019

In [31]:
import warnings
from functools import reduce

In [32]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [33]:
np.random.seed(42)

In [34]:
num = 100000

In [35]:
X = np.random.rand(num, 2)
Y = np.zeros(num)
Y[X[:, 0] * X[:, 0] > X[:, 1]] = 1

In [36]:
train, test = train_test_split(np.arange(num))
train_x, train_y = X[train], Y[train]
test_x, test_y = X[test], Y[test]

In [37]:
sigmoid = lambda x: 1 / (1 + np.exp(-x))

In [38]:
class BaseLayer(object):
    def __init__(self):
        pass
    def forward(self, X: np.ndarray) \
            -> np.ndarray:
        raise Exception("Base class forward not implemented")
    def backward(self, grad: np.ndarray) \
            -> np.ndarray:
        raise Exception("Base class backward not implemented")

In [68]:
class Layer(BaseLayer):
    def __init__(self, head: int, size: int = 1):
        self.head = head
        self.params = np.random.rand(size, head)
        self.bias = np.random.rand(size, 1)
        
        self.after, self.before = None, None
        self.X, self.A = None, None
        self.dW, self.db = 0, 0
    
    def forward(self, X: np.ndarray, grad: bool = True):
        if grad:
            if self.X is not None:
                warnings.warn("Layer backward is not called after forward.")
            self.X = X
            
        self.A = sigmoid(np.dot(self.params, X) + self.bias)
        
        return self.after.forward(self.A) if self.after else self.A
    
    def backward(self, grad: np.ndarray):
        def process(g):
            return np.multiply(np.dot(self.after.params.T, g), 1 - np.power(self.A, 2))
        
        if self.X is None:
            raise Exception("Layer backward call must after forward.")
            
        dZ = process(self.after.backward(grad)) if self.after else (self.A - grad)
        
        self.dW += np.dot(dZ, self.X.T) / np.size(self.X.T, 0)
        self.db += np.sum(dZ, axis=1, keepdims=True) / np.size(self.X.T, 0)
        
        # Release previous input
        self.X = None

        return dZ
    
    def update(self, lr: float):
        self.params = self.params - lr * self.dW
        self.bias = self.bias - lr * self.db
        
        self.dW, self.db = 0, 0
        
        if self.after:
            self.after.update(lr)
        
    def append(self, layer):
        self.after = layer
        layer.before = self
        return self

In [69]:
class Activation(Layer):
    pass

In [70]:
class relu(Activation):
    def forward(self, X):
        return 1. / (1 + np.exp(-X))

    def backward(self, grad):
        return grad * (1. - grad)

In [114]:
epoch = 2000

In [115]:
network = Layer(2, 4).append(
    Layer(4)
)

In [116]:
with tqdm(total=epoch) as t:
    for e in range(epoch):
        forward = network.forward(train_x.T)
        loss = -(train_y * np.log(forward) + (1 - train_y) * np.log(1 - forward))
        backward = network.backward(train_y)
        network.update(.5)
        
        t.set_postfix(loss=loss.mean())
        t.update()

100%|██████████| 2000/2000 [00:47<00:00, 42.05it/s, loss=0.122]


In [117]:
np.mean(np.rint(network.forward(test_x.T, grad=False)) == test_y)

0.95164

In [None]:
Network([
    Layer(2, 4),
    ReLU(),
    Layer(4, 8),
    ReLU(),
    Layer(8, 4),
    ReLU(),
    Layer(4, 1),
    ReLU(),
])

In [None]:
class Regressor(object):
    sigmoid = lambda x: 1 / (1 + np.exp(-x))
    
    def __init__(self, shape: tuple = (2,), lr: int = .1):
        self.shape = shape
        self.lr = lr
        self.weights = np.random.rand(*shape)
        self.bias = np.random.rand(1)
    
    def train(self, X: np.ndarray, Y: np.ndarray, epoch: int = 500):
        with tqdm(total=epoch) as t:
            for e in range(epoch):
                pred = np.vectorize(Regressor.sigmoid)(np.dot(self.weights, X.T) + self.bias)

                loss = -(Y * np.log(pred) + (1 - Y) * np.log(1 - pred))

                dz = pred - Y
                self.weights -= np.mean(np.multiply(dz, X.T), axis=1) * self.lr
                self.bias -= np.mean(dz) * self.lr

                t.set_postfix(loss=loss.mean())
                t.update()
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        return np.vectorize(Regressor.sigmoid)(np.dot(self.weights, X.T) + self.bias)
    
    def test(self, X: np.ndarray, Y: np.ndarray) -> float:
        return np.mean(np.rint(self.predict(X)) == Y)

In [None]:
network = Regressor()

In [None]:
network.train(train_x, train_y)

In [None]:
print ('score:', network.test(test_x, test_y))

In [None]:
network = Regressor()

In [None]:
network.weights

In [None]:
network.bias

In [None]:
X, Y = train_x, train_y

In [None]:
pred = np.vectorize(Regressor.sigmoid)(np.dot(network.weights, X.T) + network.bias)

In [None]:
(-(Y * np.log(pred) + (1 - Y) * np.log(1 - pred))).mean()

In [77]:
pred

array([0.75093799, 0.79072221, 0.79027247, ..., 0.7927926 , 0.83392317,
       0.82973434])

In [None]:
pred = np.vectorize(Regressor.sigmoid)(np.dot(self.weights, X.T) + self.bias)

loss = 

dz = pred - Y
self.weights -= np.mean(np.multiply(dz, X.T), axis=1) * self.lr
self.bias -= np.mean(dz) * self.lr

t.set_postfix(loss=loss.mean())
t.update()

In [None]:
head = Layer(2)

In [None]:
head.X = train_x.T
head.A = sigmoid(np.dot(head.params, train_x.T) + head.bias)

In [None]:
(-(Y * np.log(head.A) + (1 - Y) * np.log(1 - head.A))).mean()

In [None]:

        dZ = process(self.after.backward(grad)) if self.after else (self.A - grad)
        
        self.dW += np.dot(dZ, self.X.T) / np.size(self.X, 0)
        self.db += np.sum(dZ, axis=1, keepdims=True) / np.size(self.X, 0)
        
        # Release previous input
        self.X = None

In [None]:
head = Layer(2, 4).append(
    Layer(4)
)

In [149]:
head = Layer(2)

In [None]:
class ReLU

In [None]:
class Layer(BaseLayer):
    def __init__(self, head: int, size: int = 1):
        self.head = head
        self.params = np.random.rand(size, head)
        self.bias = np.random.rand(size, 1)
        
        self.after, self.before = None, None
        self.X, self.A = None, None
        self.dW, self.db = 0, 0
    
    def forward(self, X: np.ndarray, grad: bool = True):
        if grad:
            if self.X is not None:
                warnings.warn("Layer backward is not called after forward.")
            self.X = X
            
        self.A = sigmoid(np.dot(self.params, X) + self.bias)
        
        return self.after.forward(self.A) if self.after else self.A
    
    def backward(self, grad: np.ndarray):
        def process(g):
            return np.multiply(np.dot(self.after.params.T, g), 1 - np.power(self.A, 2))
        
        if self.X is None:
            raise Exception("Layer backward call must after forward.")
            
        dZ = process(self.after.backward(grad)) if self.after else (self.A - grad)
        
        self.dW += np.dot(dZ, self.X.T) / np.size(self.X.T, 0)
        self.db += np.sum(dZ, axis=1, keepdims=True) / np.size(self.X.T, 0)
        
        # Release previous input
        self.X = None

        return dZ
    
    def update(self, lr: float):
        self.params = self.params - lr * self.dW
        self.bias = self.bias - lr * self.db
        
        self.dW, self.db = 0, 0
        
        if self.after:
            self.after.update(lr)
        
    def append(self, layer):
        self.after = layer
        layer.before = self
        return self