<a href="https://colab.research.google.com/github/itissandeep98/ML-Assignments/blob/master/Assignment3/ML_Assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Imports 

In [61]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms
import torchvision.datasets as datasets

# Pre Processing

In [129]:
class MyPreProcessor():
  """
  My steps for pre-processing for the All datasets.
  """

  def __init__(self):
    pass

  def pre_process(self, dataset):
    """
    Reading the file and preprocessing the input and output.
    Note that you will encode any string value and/or remove empty entries in this function only.
    Further any pre processing steps have to be performed in this function too. 

    Parameters
    ----------

    dataset : integer with acceptable values 0, 1, or 2
    0 ->  Dataset
    1 ->  Dataset
    2 ->  Dataset

    Returns
    -------
    X : 2-dimensional numpy array of shape (n_samples, n_features)
    y : 1-dimensional numpy array of shape (n_samples,)
    """
    scaler = StandardScaler()
    if dataset == 0:
      df=pd.read_csv("/content/sample_data/mnist_train_small.csv",header=None)
      X=df.iloc[:,1:].to_numpy()
      X=scaler.fit_transform(X) 

      y=df[0].to_numpy()
      b = np.zeros((y.size, y.max()+1))
      b[np.arange(y.size),y] = 1
      y=b
           
    
    elif dataset == 1:
      df=pd.read_csv("/content/drive/MyDrive/ML_Assignment3/largeTrain.csv",header=None)
      X=df.iloc[:,1:].to_numpy()
      X=scaler.fit_transform(X) 
      y=df[0].to_numpy()
      # b = np.zeros((y.size, y.max()+1))
      # b[np.arange(y.size),y] = 1
      # y=b

    elif dataset == 2:
      df=pd.read_csv("/content/drive/MyDrive/ML_Assignment3/largeValidation.csv",header=None)
      X=df.iloc[:,1:].to_numpy()
      y=df[0].to_numpy()
      # b = np.zeros((y.size, y.max()+1))
      # b[np.arange(y.size),y] = 1
      # y=b
    
    return X, y

preprocessor = MyPreProcessor()


#My Neural Network

In [63]:
class MyNeuralNetwork():
    """
    My implementation of a Neural Network Classifier.
    """

    acti_fns = ['relu', 'sigmoid', 'linear', 'tanh', 'softmax']
    weight_inits = ['zero', 'random', 'normal']

    def __init__(self, n_layers, layer_sizes, activation, learning_rate, weight_init, batch_size, num_epochs):
        """
        Initializing a new MyNeuralNetwork object

        Parameters
        ----------
        n_layers : int value specifying the number of layers

        layer_sizes : integer array of size n_layers specifying the number of nodes in each layer

        activation : string specifying the activation function to be used
                     possible inputs: relu, sigmoid, linear, tanh

        learning_rate : float value specifying the learning rate to be used

        weight_init : string specifying the weight initialization function to be used
                      possible inputs: zero, random, normal

        batch_size : int value specifying the batch size to be used

        num_epochs : int value specifying the number of epochs to be used
        """

        if activation not in self.acti_fns:
            raise Exception('Incorrect Activation Function')

        if weight_init not in self.weight_inits:
            raise Exception('Incorrect Weight Initialization Function')
        
        np.random.seed(0)
        self.n_layers=n_layers
        self.layer_sizes=layer_sizes 
        self.activation=activation 
        self.learning_rate=learning_rate 
        self.weight_init=weight_init
        self.batch_size=batch_size
        self.num_epochs=num_epochs
        
        weights=[]
        bias=[]
        for i in range(self.n_layers-1):
          weights.append(np.array(self.weight_func((self.layer_sizes[i],self.layer_sizes[i+1]))))
          bias.append(np.zeros(self.layer_sizes[i+1])) 

        self.weights=np.array(weights)
        self.bias=np.array(bias)
        

    def activation_func(self,X):
      if self.activation=="relu":
        return self.relu(X),self.relu_grad(X)
      elif self.activation=="sigmoid":
        return self.sigmoid(X),self.sigmoid_grad(X)
      elif self.activation=="linear":
        return self.linear(X),self.linear_grad(X)
      elif self.activation=="tanh":
        return self.tanh(X),self.tanh_grad(X)
      else:
        return self.softmax(X),self.softmax_grad(X)

    def relu(self, X):
        """
        Calculating the ReLU activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        x_calc=np.maximum(0,X)
        return x_calc

    def relu_grad(self, X):
        """
        Calculating the gradient of ReLU activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        # print(X)
        x_calc=np.zeros(X.shape)
        x_calc[X>0]=1

        return x_calc

    def sigmoid(self, X):
        """
        Calculating the Sigmoid activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """

        x_calc= 1/(1+np.exp(-X))

        return x_calc

    def sigmoid_grad(self, X):
        """
        Calculating the gradient of Sigmoid activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        sig=self.sigmoid(X)
        x_calc=sig*(1-sig)
        return x_calc

    def linear(self, X):
        """
        Calculating the Linear activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        x_calc=X
        return x_calc

    def linear_grad(self, X):
        """
        Calculating the gradient of Linear activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        x_calc=np.zeros(X.shape)
        x_calc[X>0]=1
        x_calc[X<0]=-1
        return x_calc

    def tanh(self, X):
        """
        Calculating the Tanh activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        x_calc=(np.exp(X)-np.exp(-X))/(np.exp(X)+np.exp(-X))
        return x_calc

    def tanh_grad(self, X):
        """
        Calculating the gradient of Tanh activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        tanh=self.tanh(X)
        x_calc=1-tanh**2
        return x_calc

    def softmax(self, X):
        """
        Calculating the ReLU activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        expo = np.exp(X)
        x_calc=expo/expo.sum(axis=0)
        return x_calc

    def softmax_grad(self, X):
        """
        Calculating the gradient of Softmax activation for a particular layer

        Parameters
        ----------
        X : 1-dimentional numpy array 

        Returns
        -------
        x_calc : 1-dimensional numpy array after calculating the necessary function over X
        """
        s = self.softmax(X)
        print(s.shape,X.shape)
        si_sj = - s * s.reshape(X.shape[0], 1)
        x_calc = np.diag(s) + si_sj
        
        return x_calc
    

    def weight_func(self,shape):
      if self.weight_init=="zero":
        return self.zero_init(shape)
      elif self.weight_init=="random":
        return self.random_init(shape)
      else:
        return self.normal_init(shape)

    def zero_init(self, shape):
        """
        Calculating the initial weights after Zero Activation for a particular layer

        Parameters
        ----------
        shape : tuple specifying the shape of the layer for which weights have to be generated 

        Returns
        -------
        weight : 2-dimensional numpy array which contains the initial weights for the requested layer
        """
        weight= np.zeros(shape)
        return weight 

    def random_init(self, shape):
        """
        Calculating the initial weights after Random Activation for a particular layer

        Parameters
        ----------
        shape : tuple specifying the shape of the layer for which weights have to be generated 

        Returns
        -------
        weight : 2-dimensional numpy array which contains the initial weights for the requested layer
        """
        weight= np.random.rand(shape[0],shape[1])*10
        return weight 

    def normal_init(self, shape):
        """
        Calculating the initial weights after Normal(0,1) Activation for a particular layer

        Parameters
        ----------
        shape : tuple specifying the shape of the layer for which weights have to be generated 

        Returns
        -------
        weight : 2-dimensional numpy array which contains the initial weights for the requested layer
        """
        weight=np.random.normal(size=shape)
        return weight
    
    def cross_entropy(self,y_hat,y):
        samples=y.shape[0]
        error=y_hat-y
        return error/samples


    def fit(self, X, y):
        """
        Fitting (training) the linear model.

        Parameters
        ----------
        X : 2-dimensional numpy array of shape (n_samples, n_features) which acts as training data.

        y : 1-dimensional numpy array of shape (n_samples,) which acts as training labels.
        
        Returns
        -------
        self : an instance of self
        """
        

        weights=self.weights
        bias=self.bias

        for epoch in range(self.num_epochs):
          for batch in range(0,X.shape[0],self.batch_size):
            X_sample=X[batch:batch+self.batch_size,:]
            y_sample=y[batch:batch+self.batch_size]
            input=X_sample
            output=y_sample
            preActivation_H = {}
            postActivation_H = {}

            # Forward Propagation
            for layer in range(self.n_layers-1):
              layer_weight=weights[layer]
              hidden_output=input.dot(layer_weight)+bias[layer]
              hidden_output_activate,_=self.activation_func(hidden_output)          
              input=hidden_output_activate  
              preActivation_H[layer]=hidden_output     
              postActivation_H[layer]=hidden_output_activate

            
            # Backward Propagation
            dW=self.cross_entropy( postActivation_H[self.n_layers-2],output)
            weights[self.n_layers-2]-=self.learning_rate*(postActivation_H[self.n_layers-3].T.dot(dW))
            bias[self.n_layers-2]-=self.learning_rate*np.sum(dW)

            for layer in range(self.n_layers-3,0,-1):
              delta=dW.dot(weights[layer+1].T)
              _,derv=self.activation_func(preActivation_H[layer])
              dW=delta*derv

              weights[layer]-=self.learning_rate*postActivation_H[layer-1].T.dot(dW)
              bias[layer]-=self.learning_rate*np.sum(dW,axis=0)

            delta=dW.dot(weights[1].T)
            _,derv=self.activation_func(preActivation_H[0])
            dW=delta*derv
            weights[0]-=self.learning_rate*X_sample.T.dot(dW)
            bias[0]-=self.learning_rate*np.sum(dW,axis=0)

        self.weights=weights
        self.bias=bias
        return self

    def predict_proba(self, X):
        """
        Predicting probabilities using the trained linear model.

        Parameters
        ----------
        X : 2-dimensional numpy array of shape (n_samples, n_features) which acts as testing data.

        Returns
        -------
        y : 2-dimensional numpy array of shape (n_samples, n_classes) which contains the 
            class wise prediction probabilities.
        """
        y=X
        for w, b in zip(self.weights, self.bias):
            z = np.dot(y,w) + b
            y,_ = self.activation_func(z)

        # return the numpy array y which contains the probability of predicted values
        return y

    def predict(self, X):
        """
        Predicting values using the trained linear model.

        Parameters
        ----------
        X : 2-dimensional numpy array of shape (n_samples, n_features) which acts as testing data.

        Returns
        -------
        y : 1-dimensional numpy array of shape (n_samples,) which contains the predicted values.
        """
        y=self.predict_proba(X)

        # return the numpy array y which contains the predicted values
        return y.argmax(axis=1)

    def score(self, X, y):
        """
        Predicting values using the trained linear model.

        Parameters
        ----------
        X : 2-dimensional numpy array of shape (n_samples, n_features) which acts as testing data.

        y : 1-dimensional numpy array of shape (n_samples,) which acts as testing labels.

        Returns
        -------
        acc : float value specifying the accuracy of the model on the provided testing set
        """

        y_pred=self.predict(X)
        y=y.argmax(axis=1)
        acc=metrics.accuracy_score(y,y_pred)
        return acc

## Testing

In [64]:
x_train= np.array([[1,2],[2,3]])
y_train = np.array([[1,0],[0,1]])
a=MyNeuralNetwork( 3,[2,5,2], "sigmoid", 0.01,"zero", 1,6)
a.fit(x_train,y_train)
a.predict(x_train)
print(a.weights)

[array([[3.80351334e-05, 3.80351334e-05, 3.80351334e-05, 3.80351334e-05,
        3.80351334e-05],
       [7.56573171e-05, 7.56573171e-05, 7.56573171e-05, 7.56573171e-05,
        7.56573171e-05]])
 array([[-4.69244305e-05,  4.69244305e-05],
       [-4.69244305e-05,  4.69244305e-05],
       [-4.69244305e-05,  4.69244305e-05],
       [-4.69244305e-05,  4.69244305e-05],
       [-4.69244305e-05,  4.69244305e-05]])]


In [65]:
X, y = preprocessor.pre_process(0)
print(X.shape,y.shape)
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.20)

(20000, 784) (20000, 10)


In [66]:
classifier=MyNeuralNetwork(5,[784,256, 128, 64,10],'linear',0.1,'random',600,50)
classifier.fit(X_train,y_train)
# print(classifier.weights)
classifier.score(X_test,y_test)



0.098

In [67]:
classifier.predict(X_test)



array([0, 0, 0, ..., 0, 0, 0])

# Q3

In [130]:
X_train,y_train= preprocessor.pre_process(1)
X_val,y_val= preprocessor.pre_process(2)
X_train.shape,y_train.shape,X_val.shape,y_val.shape

((9000, 128), (9000,), (1000, 128), (1000,))

In [141]:
hidden_units=[5, 20, 50, 100 ,200]
input_size=128
hidden_sizes=5
output_size=10
BATCH_SIZE=600
model = nn.Sequential(nn.Linear(input_size, hidden_sizes),
                      nn.ReLU(),
                      nn.Linear(hidden_sizes, output_size),
                      nn.Softmax(dim=1))

In [142]:
class MyDataset(data.Dataset):

  def __init__(self,X,y):
    self.X=X
    self.y=y

  def __len__(self):
    return self.X.shape[0]

  def __getitem__(self,i):
    return self.X[i],self.y[i]
  
train_data=MyDataset(X_train,y_train)
train_iterator = data.DataLoader(train_data, shuffle = True, batch_size = BATCH_SIZE)

In [143]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.CrossEntropyLoss()
model = model.to(device)
criterion = criterion.to(device)
optimizer = optim.Adam(model.parameters())

In [144]:
def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim = True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

In [145]:
def train(model, iterator, optimizer, criterion, device):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for (x, y) in iterator:
        
        x = x.to(device)
        y = y.to(device)
        
        optimizer.zero_grad()
                
        y_pred = model(x.float())
        # print(y_pred)
        
        loss = criterion(y_pred, y)
        
        acc = calculate_accuracy(y_pred, y)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator),epoch_acc / len(iterator)

In [146]:
train(model,train_iterator,optimizer,criterion,device)

(2.297959836324056, 0.10255555609862009)