In [15]:
import numpy as np
from itertools import chain

In [17]:
# for testing
import torch
import torch.nn as nn
import torch.nn.functional as F

In [16]:
np.random.seed(1)

# Init

In [2]:
### no auto gradient.
def init_xavior_uniform(num_outs, num_ins = None, gain = 1.):
    if num_ins is None:
        var = 1/num_outs
        size = (num_outs,)

    else:
        var = 6/(num_ins + num_outs)
        size = (num_ins, num_outs)

    factor = gain*np.sqrt(var)
    return np.random.uniform(low = -factor, 
                             high = factor, 
                             size = size)

In [3]:
class Parameters():
    def __init__(self, data = None):
        self.data = data
        self.grad = None

In [48]:
class Module():
    def forward(self, x):pass
    
    def __call__(self, x): return self.forward(x)
    
    @property
    def params(cls):
        return (getattr(cls, i) for i in cls.__dict__.keys() if isinstance(getattr(cls, i), Parameters))

# Linear

In [90]:
class Linear(Module):
    def __init__(self, num_ins, num_outs):
        super().__init__()
        self.weight = Parameters(init_xavior_uniform(num_outs, num_ins))
        self.bias = Parameters(init_xavior_uniform(num_outs = num_outs))
    
    def forward(self, x):
        # X: (batch, num_ins)
        z = np.dot(x, self.weight.data) + self.bias.data
        
        ## record all intermediate that used for backward
        back = {'x': x}
        return z, back
    
    def backward(self, dout, back):
        ## return gradients for update
        x = back['x']
        self.bias.grad = np.sum(dout, axis = 0)
        self.weight.grad = np.dot(x.T, dout)
        print(self.weight.grad.shape)
        dx = np.dot(dout, self.weight.data.T)
        return dx


In [132]:
##TESTING
x = np.random.randn(5, 10)
l1 = Linear(10, 15)
weight, bias = l1.weight.data, l1.bias.data

l2 = nn.Linear(in_features= 10, out_features= 15)
l2.weight = torch.nn.Parameter(torch.tensor(weight.T))
l2.bias = torch.nn.Parameter(torch.tensor(bias))

#FORWARD TESTING
out1, back = l1(x) 

x2 = torch.tensor(x, requires_grad = True)
x2.retain_grad()
out2 = l2(x2)
print(((torch.tensor(out1) - out2) < 1e-7).all())

## BACKWARD TESTING
loss = out2.sum()
loss.backward()

dx = l1.backward(np.ones((5, 15)), back)
print(((torch.tensor(l1.weight.grad) - l2.weight.grad.T) < 1e-6).all())

print(((torch.tensor(dx) - x2.grad) < 1e-6).all()) 

tensor(True)
(10, 15)
tensor(True)
tensor(True)


# Activation Relu

In [7]:
class Relu():
    def forward(self, x):
        return np.maximum(0.,x), {'mask': x > 0}
    
    def backward(self, dout, back):
        return dout*back['mask']

# Dropout

In [8]:
class Dropout():
    def __init__(self, p = 0.5):
        self.p = p 
        self.mode = 'Training'
    
    def forward(self, x):
        if self.mode == 'Infer': self.p = 1
        mask = np.random.random(x.shape) > self.p
        return x*mask, {'mask': mask}
    
    def backward(self, dout, back):
        return mask*dout

# Bachnorm

In [9]:
    
class BatchNorm1D():
    def __init__(self, num_outs, beta_m = 0.9):
        self.gamma = Parameters(np.ones(num_outs))
        self.beta = Parameters(np.zeros(num_outs))
        
        self.wma_mean = np.zeros_like(self.gamma)
        self.wma_var = np.zeros_like(self.beta)
        self.beta_m = beta_m
        
        self.mode = 'Training'
        
    def update_stats(self, name, value):
        stat = getattr(self, name)
        setattr(self, name, self.beta_m*stat + (1 - self.beta_m)*value)
    
    def forward(self, x):
        if self.mode == 'Infer': 
            mean, std = self.wma_mean, self.wma_var
        elif self.mode == 'Training':
            mean, std = np.mean(x, axis= 0), np.std(x, axis = 0)
        else: raise NameError('Invalid mode {}'.format(self.mode))
            
        update_stats('wma_mean', mean)
        update_stats('wma_std', std)
        norm_x = (x - mean)/std
        out = self.gamma.data * norm_x + self.beta.data
        return out, {'mean': mean, 'var': var, 'x': x, 'norm_x': norm_x}
    
    def backward(self, dout, back):
        batch_size = dout.shape[0]
        mean, var, x, norm_x = back['mean'], back['var'], back['x'], back['norm_x']
        self.beta.grad = np.sum(dout, axis= 0)
        self.gamma.grad = np.sum(dout*norm_x, axis= 0)
        
        dnorm_x = dout*self.gamma.data
        dx_sub_mean1 = dnorm_x/std
        dstd_invert = np.sum(dnorm*(x - mean), axis= 0)
        dstd = (-1/(std**2))*dstd
        
        dvar = dstd*np.sqrt(std)/2
        dx_sub_mean_square = np.tile(dvar, (batch_size, 1))
        dx_sub_mean2 = 2 * dx_sub_mean_square * (x - mean)
        
        dx_sub_mean = dx_sub_mean1 + dx_sub_mean2
        dmean = -dx_sub_mean
        dx1 = dx_sub_mean
        dx2 = np.tile(dmean, (batch_size, 1))/batch_size
        
        return dx1 + dx2
    
    @property
    def params(cls):
        return (getattr(cls, i) for i in cls.__dict__.keys() if isinstance(getattr(cls, i), Parameters))

# Net

In [11]:
class FCNet():
    def __init__(self, num_ins, num_hids, num_outs):
        ## better to create a sequence here
        self.layers = [Linear(num_ins, num_hids), Relu(), Linear(num_hids, num_outs)]
    
    def forward(self, x):
        backs = [], []
        out = x
        for layer in self.layers:
            out, back = layer.forward(out)
            backs.append(back)
        
        return out, backs
    
    def backward(self, dout, backs):
        for back, layer in zip(backs, layers):
            dout = layer.backward(dout, back)
            
    def training_mode():
        pass
    
    def infer_mode():
        pass
            
    @property
    def params(self):
        return chain(*[layer.params for layer in self.layers if hasattr(layer, 'params')])

# Softmax

In [12]:
def softmax(x):
    x_exp = np.exp(x)
    return x_exp/np.sum(x_exp, axis= 1, keepdims= True)

def one_hot(x,num_cls = None):
    batch_size = x.shape[0]
    num_cls = num_cls if num_cls is not None else int(np.max(x))
    one = np.zeros((batch_size, num_cls))
    one[range(batch_size), x] = 1
    return one

# Cross entropy loss

In [13]:
class CrossEntropyLoss():
    def forward(self, x, target):
        
        def logsumexp(x):
            xmax = np.max(x, axis= -1, keepdims= True)
            return xmax + np.log(np.sum(np.exp(x - xmax), axis= -1, keepdims= True))
        
        def logsoftmax(x):
            return x - logsumexp(x)
        
        logp = logsoftmax(x)
        loss = -np.mean(logp[range(len(target)), target])
        
        return loss, {'x': x, 'target': target}
    
    def backward(self, backs):
        x, target = back['x'], back['target']
        return (softmax(x) - one_hot(target))/x.shape[0]

# Optimizer

In [14]:
class SGD():
    def __init__(self, params, lr = 0.01, weight_decay = 0.):
        self.params = params
        self.lr = lr
        self.weight_decay = weight_decay
        
    def zero_grad(self):
        for param in self.params:
            param.grad = np.zeros_like(param.grad)
        
    def step(self):
        for param in self.params:
            param.data = param.data - self.lr*(param.grad + 2*self.weight_decay*param.data)