In [1]:
from torch import FloatTensor, LongTensor, Tensor
import math

## Module Superclass

In [2]:
class Module(object):
    def forward(self, *input_):
        raise NotImplementedError
        
    def backward(self, *gradwrtoutput):
        raise NotImplementedError
        
    def param(self):
        return []

## ReLU Module

ReLU function: 
\begin{equation}
f(x) = max(0, x)
\end{equation}

the derivative of ReLU is

\begin{equation} 
f'(x)=
    \begin{cases}
      1, & \text{if}\ x>0 \\
      0, & \text{otherwise}
    \end{cases}
\end{equation}

In [3]:
# This module represents the ReLU activation function
class ReLU(Module):
    def __init__(self):
        self.z = None
    
    # input_: the tensor outputed by the current layer
    def forward(self, input_):
        self.z = input_.clone()
        input_[input_ < 0] = 0
        return input_
        
    def backward(self, gradwrtoutput):
        da = gradwrtoutput
        tensor = self.z.clone()
        # g'(z)
        tensor[tensor > 0] = 1
        tensor[tensor < 0] = 0
        # dz[l]
        return da.mul(tensor)
        
    def param(self):
        return []
    
    def zero_grad(self):
        pass

## Tanh Module

In [4]:
class Tanh(Module):   
    def __init__(self):
        self.z = None
    
    # input_: the tensor outputed by the current layer
    def forward(self, input_):
        self.z = input_
        return input_.tanh()
        
    def backward(self, gradwrtoutput):
        da = gradwrtoutput
        # g'(z)
        g_prime = (1 - self.z.tanh().pow(2))
        # dz[l]
        return da.mul(g_prime)
        
    def param(self):
        return []
    
    def zero_grad(self):
        pass

## Linear Module
fully connected layer

In [5]:
class Linear(Module):   
    def __init__(self, in_dim, out_dim):
        # keep track of the weigths, the biases and the output of the previous layer's activation function
        self.w = Tensor(out_dim,in_dim).normal_(0)
        self.b = Tensor(out_dim,1).normal_(0)
        self.x_previous_layer = None
        # init the gradient of the loss wrt w / b
        self.grad_w_sum = Tensor(self.w.size()).zero_()
        self.grad_b_sum = Tensor(self.b.size()).zero_()
    
    # input_: the output of the previous layer's activation function
    def forward(self, input_):
        self.x_previous_layer = input_
        return (self.w.mm(input_.t()) + self.b).t()
        
    def backward(self, gradwrtoutput):
        dz = gradwrtoutput.t()
        dw = dz.mm(self.x_previous_layer)
        db = dz
        # sum the gradients for the weights and biases
        self.grad_w_sum += dw
        self.grad_b_sum += db.sum(1).unsqueeze(1)
        return (self.w.t().mm(dz)).t()
        
    # returns a list of pairs, each composed of a parameter tensor and a gradient tensor
    # parameters: weights and biases
    def param(self):
        return [ (self.w, self.grad_w_sum), (self.b, self.grad_b_sum) ]
    
    def zero_grad(self):
        self.grad_w_sum.zero_()
        self.grad_b_sum.zero_()

## Sequential Module
to combine several modules in basic sequential structure

In [6]:
# This module allows to combine several modules (layers, activation functions) in a basic sequential structure
class Sequential(Module):    
    def __init__(self, *layers_):
        self.modules = layers_
        
    # input_: the input data is a minibatch whose columns are features and lines are samples
    def forward(self, input_):
        x = input_
        for module in self.modules:
            x = module.forward(x)
        return x
        
    def backward(self, gradwrtoutput):
        x = gradwrtoutput
        for module in reversed(self.modules):
            x = module.backward(x)
        return x
        
    # returns a flatened list of each module's parameters
    # each parameter in the list is represented as a tuple containing the parameter tensor (e.g. w)
    # and the gradient tensor (e.g. dl/dw)
    def param(self):
        return [ p for module in self.modules for p in module.param() ]
    
    # sets the gradient of each layer to zero before the next batch can go through the network
    def zero_grad(self):
        for module in self.modules:
            module.zero_grad()

## MSE Loss Function

In [7]:
class LossMSE(Module): 
    def __init__(self):
        self.error = None
        
    def forward(self, preds, labels):
        self.error = preds - labels
        return self.error.pow(2).sum()
        
    def backward(self):
        return 2 * self.error
        
    def param(self):
        return []

## SGD optimization

In [8]:
class optim_SGD(Module):
    # parameters: the parameters of the Sequential module
    def __init__(self, parameters, learning_rate):
        self.param = parameters #[ p.shallow() for tup in parameters for p in tup ]
        self.lr = learning_rate
        
    # performs a gradient step (SGD) for all parameters
    def step(self):
        for (p, grad_p) in self.param:
            p.sub_(self.lr*grad_p)

## TODO

zero_grad() method: should we put it in the Module class, so that we can remove it from ReLU and Tanh?

## Helpers

In [9]:
def generate_disc_set(nb):
    a = Tensor(nb, 2).uniform_(0, 1)
    target = (a.pow(2).sum(1) < (2/math.pi)).long()
    return a, target

train_input, train_target = generate_disc_set(1000)
test_input, test_target = generate_disc_set(1000)

In [10]:
# converts 'target' Tensor to one hot labels
def convert_to_one_hot(target):
    tmp = FloatTensor(target.size(0), 2).fill_(0)
    for k in range(0, target.size(0)):
        tmp[k, train_target[k]] = 1
    return tmp

In [11]:
def compute_accuracy(model, input_, target):
    nb_data_errors = 0
    output = model.forward(input_)

    _, predicted_classes = output.max(1)
    for k in range(input_.size(0)):
        if target[k] != predicted_classes[k]:
            nb_data_errors = nb_data_errors + 1
    return 100 - (100*(nb_data_errors / input_.size(0)))

## Test file

In [12]:
train_one_hot_target = convert_to_one_hot(train_target)
test_one_hot_target = convert_to_one_hot(test_target)

In [20]:
model = Sequential(Linear(2,25), Tanh(), Linear(25,25), Tanh(), Linear(25,2), Tanh())

criterion = LossMSE()
optimizer = optim_SGD(model.param(), 1e-3)
nb_epochs = 100
mini_batch_size = 5

for e in range(0, nb_epochs):
    loss = 0
    for b in range(0, train_input.size(0), mini_batch_size):
        output = model.forward(train_input.narrow(0, b, mini_batch_size))
        # sum the loss for each batch to get the current epoch's loss
        loss += criterion.forward(output, train_one_hot_target.narrow(0, b, mini_batch_size))
        # set the gradients of all layers to zero before the next batch can go through the network
        model.zero_grad()
        model.backward(criterion.backward())
        optimizer.step() # performs a gradient step to optimize the parameters
    print("Epoch", e+1, ":", loss)

Epoch 1 : 2584.6125940491916
Epoch 2 : 2562.9227145841924
Epoch 3 : 2559.1819210344256
Epoch 4 : 2556.34194057982
Epoch 5 : 2552.1511669458014
Epoch 6 : 2545.065045519408
Epoch 7 : 2539.3356628947777
Epoch 8 : 2533.7793361872205
Epoch 9 : 2520.6038914110977
Epoch 10 : 2498.386121905313
Epoch 11 : 2471.576382369508
Epoch 12 : 2432.252320668423
Epoch 13 : 2380.168343811312
Epoch 14 : 2335.1762190285895
Epoch 15 : 1996.769668612924
Epoch 16 : 169.453153177641
Epoch 17 : 116.98498995069725
Epoch 18 : 102.1098946521922
Epoch 19 : 94.87358936803119
Epoch 20 : 90.31354990904754
Epoch 21 : 87.0080864146947
Epoch 22 : 84.40811606649241
Epoch 23 : 82.24396087384596
Epoch 24 : 80.36168614791774
Epoch 25 : 78.66548084288162
Epoch 26 : 77.0935193261362
Epoch 27 : 75.60558312391112
Epoch 28 : 74.17643984092948
Epoch 29 : 72.79092507846734
Epoch 30 : 71.440971131852
Epoch 31 : 70.12345024647114
Epoch 32 : 68.83929716515237
Epoch 33 : 67.59255797627554
Epoch 34 : 66.39006276126318
Epoch 35 : 65.240241

In [21]:
print("Train accuracy :", compute_accuracy(model, train_input, train_target), "%")

Train accuracy : 98.7 %


In [22]:
print("Test accuracy :", compute_accuracy(model, test_input, test_target), "%")

Test accuracy : 99.2 %
