In [None]:
from torch import Tensor, max, random
import math

class Module(object):
    '''Module superclass from which all layers will inherit.'''
    def __init__(self):
        '''Constructor of the Module class.'''
        # attributes needed for all modules
        self.output = Tensor() # output of module (after calling forward method)
        self.gradInput = Tensor() # gradient with respect to input to model (result of backprop)
        self.type = str()
        
    def __call__(self, *inp, **kwargs):
        '''Makes layer callable like a function and directly returns the result of forward().'''
        return self.forward(*inp, **kwargs)
   
    def forward(self, *inp, **kwargs):
        '''should get for input, and returns, a tensor or a tuple of tensors.'''
        return self.output
        
    def backward(self, *gradwrtoutput):
        '''should get as input a tensor or a tuple of tensors containing the gradient of the loss
with respect to the module's output, accumulate the gradient wrt the parameters, and return a
tensor or a tuple of tensors containing the gradient of the loss wrt the module's input.'''
        return self.gradInput
    
    def zero_grad(self):
        '''Sets all the gradients to zero.'''
        self.gradInput *= 0.
        if hasattr(self, 'weights'):
            self.gradWeights *= 0.
        if hasattr(self, 'biases'):
            self.gradBiases *= 0.
        
    def param(self):
        '''return a list of pairs, each composed of a parameter tensor, and a gradient tensor
of same size. This list should be empty for parameterless modules (e.g. ReLU)'''
        if hasattr(self, 'weights') and hasattr(self, 'biases'):
            # BIG CHANGE for this method
            return [(self.weights, self.gradWeights), (self.biases, self.gradBiases)]
        elif hasattr(self, 'weights'):
            return [(self.weights, self.gradWeights)]
        elif hasattr(self, 'biases'):
            return [(self.biases, self.gradBiases)]
        else : return []

class Linear(Module):
    '''Module to implement linear layers of arbitrary size.'''
    def __init__(self, in_dim, out_dim):
        super(Linear, self).__init__()
        self.inp = Tensor()
        self.weights = Tensor(out_dim,in_dim).normal_() # BIG CHANGE
        self.biases = Tensor(out_dim,1).normal_()
        self.gradWeights = Tensor(self.weights.size()).zero_()
        self.gradBiases = Tensor(self.biases.size()).zero_()
        self.type = 'Linear'
           
    def forward(self, inp):
        self.inp = inp
        self.output = (self.weights.mm(inp.t()) + self.biases).t()
        return self.output
        
    def backward(self, gradOutput):
        self.gradInput =  (self.weights.t().mm(gradOutput.t())).t()
        gradWeights = Tensor.mm(gradOutput.t(), self.inp)
        gradBiases = gradOutput.t()
        # sum the gradients for the weights and biases
        self.gradWeights += gradWeights
        self.gradBiases += gradBiases.sum(1).unsqueeze(1) # unsqueeze returns vector
        return self.gradInput
        
class Tanh(Module):
    '''Module to implement Tanh acivation layer.'''
    def __init__(self):
        super(Tanh, self).__init__()
        self.type = 'Tanh'
        self.inp = Tensor()
        
    def forward(self, inp):
        self.inp = inp
        self.output = inp.tanh()
        return self.output
        
    def backward(self, gradOutput):
        dtanh = (1 - self.inp.tanh().pow(2)) # self.output does not work instead
        return gradOutput.mul(dtanh)
    
class ReLU(Module):
    '''Rectified linear unit module.'''
    def __init__(self):
        super(ReLU, self).__init__()
        self.inp = Tensor()
        self.type = 'ReLU'
    
    def forward(self, inp):
        self.inp = inp.clone()
        inp[inp < 0] = 0
        self.output = inp.clone()
        return self.output
        
    def backward(self, gradOutput):
        step = self.inp.clone()
        # derivative of ReLU is step function applied to original input
        step[step > 0] = 1
        step[step < 0] = 0
        self.gradInput = gradOutput.mul(step)
        return self.gradInput
    
class Sequential(Module):    
    '''Container to store several layers sequentially.'''
    def __init__(self, *args):
        super(Sequential, self).__init__()
        self.modules = []
        self.size = 0
        self.type = 'Sequential container'
        for arg in args:
            self.add(arg)
        print(self)

    def __str__(self):
        string = 'New neural net\n'
        for ind, module in enumerate(self.modules):
            if module.type == 'Linear':
                string += '   Layer ' + str(ind) + ': ' + module.type +', ' + str(module.weights.shape) + '\n'
            else:
                string += '   Layer ' + str(ind) + ': ' + module.type + '\n'
        return string
    
    def add(self, module, index=None):
        '''Add new layer at position index. By default is added as new last layer.'''
        if index == None: index = self.size
        if index < 0 or index > self.size:
            raise ValueError('Supplied index is out of range for number of modules in this sequence.')
        self.modules.insert(index, module)
        self.size += 1
        
    def forward(self, inp):
        temp = inp.clone()
        for module in self.modules:
            temp = module(temp) # feed forward loop
        return temp # BIG CHANGE dont save to self.output
        
    def backward(self, gradOutput):
        temp = gradOutput.clone()
        for module in reversed(self.modules):
            temp = module.backward(temp)
        return temp # dont save to self.gradInput

    def param(self): # BIG CHANGE needed for the good results
        '''returns a flattened list of each module's parameters with a tuple of the
        actual parameter and its gradient'''
        return [ p for module in self.modules for p in module.param() ]
    
    def zero_grad(self):
        '''Set the gradient of each parameter in all the modules to zero.'''
        for module in self.modules:
            module.zero_grad()
            
class LossMSE(Module):
    '''Module to implement mean square loss.'''
    def __init__(self):
        super(LossMSE, self).__init__()
        self.inp = Tensor()
        self.type = 'MSE loss'
        
    def forward(self, inp, targets):
        self.inp = inp.clone()
        self.output = (inp - targets).pow(2).sum()
        return self.output
        
    def backward(self, targets):
        self.gradInput = 2. * (self.inp - targets)
        return self.gradInput
    
class LossCrossEntropy(Module):
    def __init__(self):
        super(LossCrossEntropy, self).__init__()
        self.inp = Tensor()
        self.type = 'Cross-entropy loss'
        
    def forward(self, inp, targets):
        pass
    
    def backward(self, targets):
        pass
    
class optimizer(object):
    '''Class to optimize model parameters using stochastic gradient descent.'''
    def __init__(self, parameters, eta):
        self.params = parameters # model parameters
        self.eta = eta # learning rate
        
    def step(self):
        '''Performs one step of stochastic gradient descent.'''
        for (param, gradParam) in self.params:
            param -= self.eta*gradParam


In [None]:
def generate_disc_set(nb):
    '''Generates training set uniformely distributed in [0,1], with label 1 inside
    disk centered at [0.5, 0.5] of radius 1/sqrt(2pi) and label 0 outside.'''
    inp = Tensor(nb, 2).uniform_(0, 1)
    target = inp.sub(0.5).pow(2).sum(1).sub(1./(2*math.pi)).sign().sub(1).div(-2).long()
    return inp, target

def convert_to_one_hot_labels(inp, target):
    '''Convert label vector to one hot label matrix.'''
    tmp = inp.new_zeros(target.size(0), target.max() + 1)
    tmp.scatter_(1, target.view(-1, 1), 1.0)
    return tmp

def compute_stats(model, data_input, data_target, mini_batch_size=100):
    '''Compute accuracy and the number of wrongly predicted instances from test data.accuracy and '''
    nb_data_errors = 0
    for b in range(0, data_input.size(0), mini_batch_size):
        output = model.forward(data_input.narrow(0, b, mini_batch_size))#.reshape((mini_batch_size, 2))
        _, predicted_classes = max(output, 1)
        for k in range(mini_batch_size):
            if data_target[b + k] != predicted_classes[k]:
                nb_data_errors = nb_data_errors + 1
    accuracy = 100 - (100*(nb_data_errors / len(data_target)))
    print('Accuracy : ', accuracy, '%')
    print('Error rate: ', 100*nb_data_errors/len(data_target), '%')
    return


def train_model(model, train_input, train_one_hot_target, criterion, optimizer, nb_epochs=100, mini_batch_size=5, verbose=False):
    '''Train the given model using the give loss function with the given optimizer.'''
    for e in range(0, nb_epochs):
        loss = 0
        for b in range(0, train_input.size(0), mini_batch_size):
            output = model.forward(train_input.narrow(0, b, mini_batch_size))
            minibatch_loss = criterion.forward(output, train_one_hot_target.narrow(0, b, mini_batch_size))
            loss += minibatch_loss
            model.zero_grad() # zero all parameter gradients
            model.backward(criterion.backward(train_one_hot_target.narrow(0, b, mini_batch_size)))
            optimizer.step() # train with one step of stochastic gradient descent
        if verbose:
            if e%10 == 0:
                print('Mean loss epoch {} : {:.2f} %'.format(e, 100*loss.item()/train_input.shape[0]))
     

In [None]:
random.manual_seed(20)

train_inp, train_target = generate_disc_set(1000)
test_inp, test_target = generate_disc_set(100)

# make variance = 1, mean = 0
train_inp = train_inp.sub(train_inp.mean()).div(train_inp.std())
test_inp = test_inp.sub(test_inp.mean()).div(test_inp.std())
train_target_hot = convert_to_one_hot_labels(train_inp, train_target)
criterion = LossMSE()

model = Sequential(Linear(2, 25), Tanh(), Linear(25, 25), Tanh(), Linear(25, 2))
optim = optimizer(model.param(), 0.001)

train_model(model, train_inp, train_target_hot, criterion, optim, mini_batch_size=100, nb_epochs=100, verbose=True)
print('\nTraining set')
compute_stats(model, train_inp, train_target, mini_batch_size=1)
print('\nTest set')
compute_stats(model, test_inp, test_target, mini_batch_size=1)
