# Module

In [2]:
'''
Module
Forward and Backward passes need to be defined explicitly
For Activation and Loss Functions, the backward pass is defined by the forward's gradient
'''
import torch
from torch import FloatTensor
from torch import LongTensor
import numpy as np
import math

In [3]:
class Module():
    '''
    input: self, string identifier
    output: none
    set id to None by default
    standard instantiation
    '''
    def __init__(self, id):
        if(id == None):
            self.id = 'None'
        else :
            self.id = id


    '''
    has to be overridden
    '''
    def forward(self, *args):
        raise NotImplementedError


    '''
    has to be overridden
    '''
    def backward(self, *args):
        raise NotImplementedError


    '''
    has to be overridden
    '''
    def param(self):
        raise NotImplementedError


    '''
    does not have to be overridden
    should be overridden for Module instances that have parameter gradients
    '''
    def zero_grad(self):
        pass

In [4]:
class Linear(Module):
    '''
    input: input and output sizes for layer determination, weights and biases and standard deviation
    output: none
    weights and biases are by default initialized with the std provided according to a normal distribution
    gradients for weights and biases are set to zero
    standard instantiation
    '''
    def __init__(self, input, output, std, w = None, b = None):
        Module.__init__(self, 'Linear')
        if(w != None):
            self.w = FloatTensor(output,input).normal_(0, std)
        self.w = FloatTensor(output,input).normal_(0, std)
        if(b != None):
            self.b = FloatTensor(output).normal_(0, std)
        self.b = FloatTensor(output).normal_(0, std)
        self.dw = FloatTensor(output,input).zero_()
        self.db = FloatTensor(output).zero_()

    

    '''
    sets gradients to zero
    '''    
    def zero_grad(self):
        self.dw.zero_()
        self.db.zero_()



    '''
    input: self, input
    output: input after forward pass
    this computes forward according to the formula: input * weight + bias
    the input is flattened into a vector and weights are multipled with it
    '''
    def forward(self, x):
        s = self.w.mv(x.view(-1)) + self.b
        return s



    '''
    input: input, gradient
    output: gradient from before forward pass
    this allows the backpropagation of errors
    the weight and biases are updated thanks to the loss function gradient parameter
    '''
    def backward(self, x, dl_dx):
        self.dw.add_(dl_dx.view(-1, 1).mm(x.view(1, -1)))
        self.db.add_(dl_dx.view(-1))
        dx_previous = self.w.t().mm(dl_dx)
        return dx_previous



    '''
    returns a list of parameters to use for optimization
    '''
    def param(self):
        return [[self.w, self.dw], [self.b, self.db]]

In [5]:
class ReLU(Module):
    '''
    input: self
    output: none
    standard instantiation
    '''
    def __init__(self):
        Module.__init__(self, 'Relu')



    '''
    input: self, input
    output: input after forward pass
    '''
    def forward(self, x):
        return np.maximum(0, x)



    '''
    input: self, input, gradient
    output: backward pass propagation according to input
    allows the computation of error backward propagation
    '''
    def backward(self, x, dl_dx):
        mask = (x > 0).float()
        x = torch.mul(x, mask).view(-1, 1)
        return torch.mul(x, dl_dx)



    '''
    input: self
    output: None 
    differentiates this Module instance from those with parameters
    '''
    def param(self):
        return None

In [6]:
class Tanh(Module):
    '''
    input: self
    output: none
    standard instantiation
    '''
    def __init__(self):
        Module.__init__(self, 'Tanh')



    '''
    input: self, input
    output: input after forward pass
    '''
    def forward(self,s0):
        return s0.tanh()



    '''
    input: self, input, gradient
    output: backward pass propagation according to input
    allows the computation of error backward propagation
    '''
    def backward(self, x, dl_dx):
        return 4 * (x.view(-1,1).exp() + x.view(-1,1).mul(-1).exp()).pow(-2) * dl_dx
    


    '''
    input: self
    output: None 
    differentiates this Module instance from those with parameters
    '''
    def param(self):
        return None

In [7]:
class MSELoss(Module):
    '''
    input: self
    output: none
    standard instantiation
    '''
    def __init__(self):
        Module.__init__(self, 'MSE')



    '''
    input: self, input
    output: input after forward pass
    '''
    def forward(self, v, t):
        return (v.view(2,1) - t).pow(2).sum()



    '''
    input: self, input, gradient
    output: backward pass propagation according to input
    allows the computation of error backward propagation
    '''
    def backward(self, v, t):
        return 2 * (v.view(2,1) - t)
    


    '''
    input: self
    output: None 
    differentiates this Module instance from those with parameters
    '''
    def param(self):
        return None

# Sequential

In [8]:
'''
Sequential
'''
import torch
from torch import FloatTensor
from torch import LongTensor
import numpy as np
import math

In [9]:
class Sequential():
    '''
    input: self
    output: none
    within elements, each element is an instance of the Module class
    the cache contains an array of an input after it is passed through each forward pass
    standard instantiation
    '''
    def __init__(self, *args):
        self.elements = args
        self.cache = []


     
    '''
    input: self, data
    output: input after having gone through all Module instances' forward pass
    the forward pass computes an output through a list of Module instances
    the starting input, as well as each input after it is passed through a layer, is saved
    this list of saved inputs is necessary to compute the backward-propagatino gradients
    '''    
    def forward(self, input):
        self.cache = []
        self.cache.append(input)
        for i, elt in enumerate(self.elements):
            input = elt.forward(input);
            self.cache.append(input)
        return input



    '''
    input: self, data and model gradient -- weights and bias gradients
    output: gradient
    the backward pass computes an output through a list of Module instances
    the starting gradient as well as each gradient after it is passed through a layer, is saved
    the backpropagation can happen because a list of inputs at each layer/point has been cached
    this backpropagation algorithm in fine allows the update of weights
    '''
    def backward(self, input, grad):
        error = []
        error.append(grad)
        for i, elt in reversed(list(enumerate(self.elements))):
            grad = elt.backward(self.cache[i], grad)
            error.append(grad)
        error.reverse()
        return grad, error



    '''
    input: self
    output: none
    iterates over elements within the Sequential instance and sets their respective gradients to zero
    this is used after each model training iteration, to make sure all gradients are computed over one iteration
    '''
    def zero_grad(self):
        for element in self.elements:
            element.zero_grad()



    '''
    input: self
    output: parameters of the whole sequence as a list
    iterates over elements within the Sequential instance, and fetches their parameters
    this is used to get parameters (weights, biases, and their gradients) for optimization
    '''
    def param(self):
        parameters = []
        for elt in self.elements:
            if elt.param() != None:
                parameters += elt.param()
        return parameters

# Optimizer

In [10]:
'''
Optimizer
'''
import torch
from torch import FloatTensor
from torch import LongTensor
import numpy as np
import math

In [11]:
class Optimization():
    '''
    needs to be overridden
    '''
    def __init__(self, *args):
        raise NotImplementedError

In [14]:
class SGD(Optimization):
    '''
    input: self, parameters to be optimized, learning rate
    output: none
    standard instantiation
    '''
    def __init__(self, parameters, eta):
        self.eta = eta
        self.parameters = parameters



    '''
    input: self
    output: none
    Iterates over the parameters and updates them according to the following rule:
    param = gradient_param * learning_rate
    '''
    def step(self):
        for w in self.parameters:
            w[0]-= self.eta * w[1]

In [15]:
class SGDMomentum(Optimization):
    '''
    input: self, parameters to be optimized, learning rate, update rate/momentum
    output: none
    standard instantiation
    '''
    def __init__(self, parameters, eta, gamma = 0.9):
        self.parameters = input_parameters
        self.eta = eta
        self.gamma = gamma



    '''
    input: self, previous update vector
    output: none
    iterates over the parameters and updates them according to the following rule:
    param = gradient_param * learning_rate
    updates the learning rate according to the following rate:
    learning_rate = momentum * previous_update_vector
    '''
    def step(self, prev_update):
        for w in self.parameters:
            w[0] -= self.eta * w[1]
            self.eta += (self.gamma * prev_update)

# utils

In [16]:
'''
Utils
'''
import math
import torch
from torch import FloatTensor
from torch import LongTensor
import numpy as np
import matplotlib.pyplot as plt
import time


In [17]:
'''
input: number of data points to generate
output: binary input and corresponding class according to the following rule: 
target is 1 if point pair is within circle of radius R = sqrt(1/2pi)
''' 
def data_gen(n):
    input = FloatTensor(n, 2).uniform_(0, 1)
    target = FloatTensor(n, 2).uniform_(0, 1)
    target = input.pow(2).sum(1).sub(1 / (2*math.pi)).sign().add(1).div(2).float()
    return input, target

In [18]:
'''
input: input and class to be plotted
output: none
plots the binary data as two different classes according to their labeled class
saves the figure
'''
def data_plot(input, target):
    cmap = []
    for i in range(len(target)):
        if(target[i]):
            cmap.append('green')
        else:
            cmap.append('red')
    fig, ax = plt.subplots()
    plt.scatter(input[:,0], input[:,1], s=1,c = cmap)
    circle1 = plt.Circle((0,0),np.sqrt(1/(2*math.pi)), color = 'black', fill = False)
    plt.gcf().gca().add_artist(circle1)
    plt.ylim(0, 1);
    plt.xlim(0, 1);
    plt.title('Dataset')
    plt.show()
    fig.savefig('Dataset.png')

In [19]:
'''
input: model to be trained; binary data and corresponding class; loss and optimization criterions for training, learning rate, epochs, and verbose parameter
output: total losses for each epoch, and model parameters (weights, biases and corresponding gradients) after the last epoch training
this is SGD type update;
iteratively calls: the model's forward pass, then the loss's foward pass for a single binary data point and class
then adds the running loss to the total loss
then set all gradients to zero before computing the model's backward pass for a single binary data point and the loss function's gradient
then updates the parameter weights through the optimizer.
'''
def train(model, train_input, train_target, loss_criterion, optimizer, eta, epochs, verbose):
    losses = []
    for e in range(epochs):
        sum_loss = 0
        for s in range(len(train_input)):
            output = model.forward(train_input.narrow(0, s, 1))
            loss = loss_criterion.forward(output, train_target.narrow(0, s, 1))
            sum_loss += loss.item()
            model.zero_grad()
            grad, grad_err = model.backward(output, loss_criterion.backward(output, train_target.narrow(0, s, 1)))
            optimizer.step()
        losses.append(sum_loss)
        if((e%5 == 0 or e == 0 or e == epochs) and verbose):
            print('epoch', e,'loss', sum_loss)
    weights = model.param()
    return losses, weights

In [20]:
'''
input: model to be trained; binary data and corresponding class; loss criterion for testing, and verbose parameter
output: number of misclassified points, coordinates for the vector of misclassified as well as for classified points, and a label vector of the same size as the input data
for each sample within the test set: computes the model forward to precict label as a probability of being one class or the other
the most likely prediction (index corresponding to the max value of the last layer's output) is compared to the actual label 
errors, points, and labels that were misclassifed are logged accordingly
the l parameter returned makes it easier to keep track of which labels were misclassified
'''
def eval(model, test_input, test_target, loss_criterion, verbose):
    n_errors = 0
    cx, cy, ix, iy = [], [], [], []
    l = []
    for s in range(len(test_input)):
        output = model.forward(test_input.narrow(0, s, 1))
        t = test_target.narrow(0, s, 1).item()
        i = test_input.narrow(0, s, 1)
        p = np.argmax(output)
        if(t == p):
            cx.append(i[0][0].item())
            cy.append(i[0][1].item())
            l.append(1)
        else:
            n_errors += 1
            ix.append(i[0][0].item())
            iy.append(i[0][1].item())
            l.append(0)
    return n_errors, ix, iy, cx, cy, l

In [21]:
'''
input: incorrectly labeled and correctl labeled point coordinates (for binary data)
output: none
plots the data points, with two classes -- not according to the true label (which can be determined with the circle boundary)),
but according to the classification truth
'''
def plot_results(ix, iy, cx, cy):
    fig, ax = plt.subplots()
    plt.scatter(ix, iy, s = 1, c = 'red', label = 'Misclassified')
    plt.scatter(cx, cy, s = 1, c = 'green', label = 'Correctly Classified')
    boundary = plt.Circle((0,0),np.sqrt(1/(2*math.pi)), color = 'black', fill = False)
    plt.gcf().gca().add_artist(boundary)
    plt.ylim(-0.01, 1.1);
    plt.xlim(-0.01, 1.1);
    plt.legend(loc = 1)
    plt.title('Results')
    plt.show()
    fig.savefig('Results.png')

In [22]:
'''
input: text to be outputted, file idenifier for easy saving
output: none
allows saving model results to csv format file
'''
def write_to_csv(text, file_id):

    with open('Output/test_{}.csv'.format(file_id), mode = 'w') as to_csv:
        for i in range(len(text)):
            to_csv.write(text[i])
            to_csv.write('\n')

# test

In [24]:
# '''
# test
# '''
# from Sequential import *
# from Module import *
# from Optimizer import *
# from Utils import *
# import time

In [30]:
'''
main
creates a dataset, a model, an optimizer, a loss criterion, with learning rate, level of layer standard deviation initialization for weights and biases, batch size
prints losses and training/testing errors
'''
def main():


    torch.set_grad_enabled(False)

    verbose = False
    n_train, n_test = 1000, 1000
    train_input, train_target = data_gen(n_train)
    test_input, test_target = data_gen(n_test)
    std = 1e-1
    batch_size = 20
    epochs = 50
    eta = 1e-1


    model = Sequential(Linear(2,25, std),
                        Tanh(),
                        Linear(25,25, std),
                        Tanh(),
                        Linear(25,25, std),
                        Tanh(),
                        Linear(25,25, std),
                        Tanh(),
                        Linear(25,2, std))


    optimizer = SGD(model.param(), eta)


    loss_criterion = MSELoss()


    print('\n Training...')
    losses, weights = train(model, train_input, train_target, loss_criterion, optimizer, eta, epochs, verbose)
    print('\n Testing...')
    n_error_train, ix_train, iy_train, cx_train, cy_train, l_train = eval(model, train_input, train_target, loss_criterion, verbose)
    n_error_test,  ix_test, iy_test, cx_test, cy_test,l_test = eval(model, test_input, test_target, loss_criterion, verbose)
    print('train error {} %'.format(n_error_train/len(train_input)))
    print('test error {} %'.format(n_error_test/len(test_input)))

In [42]:
if __name__ == '__main__':
    main()


 Training...

 Testing...
train error 0.582 %
test error 0.576 %


# Visualization

In [59]:
def data_gen_visualization(n):
    print('n =', n, '\n \n')
    input = FloatTensor(n, 2).uniform_(0, 1)
    print('input = ', input, '\n', 'input size =', input.size(), '\n \n')
    target = FloatTensor(n, 2).uniform_(0, 1)
    print('target1 = ', target, '\n', 'target1 size =', target.size(), '\n \n')
    target = input.pow(2).sum(1).sub(1 / (2*math.pi)).sign().add(1).div(2).float()
    print('target2 = ', target, '\n', 'target2 size =', target.size(), '\n \n')
    return input, target

In [65]:
input_, target_ = data_gen_visualization(5)

n = 5 
 

input =  tensor([[0.0973, 0.1485],
        [0.6530, 0.9692],
        [0.3458, 0.3742],
        [0.0394, 0.4236],
        [0.1115, 0.7990]]) 
 input size = torch.Size([5, 2]) 
 

target1 =  tensor([[0.9688, 0.8497],
        [0.4597, 0.1884],
        [0.5708, 0.6112],
        [0.8108, 0.5679],
        [0.7561, 0.3849]]) 
 target1 size = torch.Size([5, 2]) 
 

target2 =  tensor([0., 1., 1., 1., 1.]) 
 target2 size = torch.Size([5]) 
 

