<a href="https://colab.research.google.com/github/pc-Jiang/math_in_deep_learning/blob/main/DSC_291_SP23.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DSC 291 Mathmatics in Deep Learning, SP23
Advisor: Mikhail Belkin

Author: Pengcen Jiang, Bin Wang

## Import packages and set random seed.



In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
import math
import numpy as np
from torch.autograd import Variable

import torchvision
from torchvision import datasets, transforms       
import torchvision.transforms as transforms

import matplotlib.pyplot as plt
%matplotlib inline
import sklearn.metrics
import seaborn as sns
import random

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
MAP_LOC = "cuda:0" if USE_CUDA else torch.device('cpu')

def set_seed(seed = 1234):
    '''Sets the seed of the entire notebook so results are the same every time we run.
    This is for REPRODUCIBILITY.'''
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # Set a fixed value for the hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)
    
set_seed()

## Define basic blocks and functions. 

### Define configurations class. 

In [14]:
class LearnFixedPointConfig(object):
    def __init__(self):
        # params for model
        self.hidden_size = 100
        self.nonlinearity = None
        self.dim_in = 10
        self.dim_out = 10
        self.batch_size = 1
        self.initialization = None
        self.use_bias = True
        self.time_steps = 10
        self.ct = False
        self.tau = 0

        # params for training
        self.lr = 0.001
        self.optimizer_type = 'SGD'
        self.momentum = 0
        self.wdecay = 0
        self.num_ep = 100
        self.add_noise = False

        # params for generating data
        self.std_fp = 1
        self.mean_fp = 0
        

    def update(self, new_config):
        self.__dict__.update(new_config.__dict__)

    def __str__(self):
        return str(self.__dict__)

### Define RNN class. 

In [17]:
class IdentityAct(object):
    def __call__(self, tensor):
        return tensor


class VanillaRNNCell(nn.Module):
    # (N, L, H_{in}): N, batch size, L, squence length, H_{in}, input size
    def __init__(self, config):
        super(VanillaRNNCell, self).__init__()
        self.input_size = config.dim_in
        self.output_size = config.dim_out
        self.hidden_size = config.hidden_size
        self.nonlinearity = config.nonlinearity
        self.use_bias = config.use_bias
        self.initialization = config.initialization
        self.ct = config.ct
        self.tau = config.tau

        self.weight_ih = nn.Parameter(
            torch.zeros((self.hidden_size, self.input_size)))
        self.weight_hh = nn.Parameter(
            torch.zeros((self.hidden_size, self.hidden_size)))
        self.weight_ho = nn.Parameter(
            torch.zeros((self.output_size, self.hidden_size)))
        self.bias = nn.Parameter(torch.zeros(self.hidden_size, 1))
        self.bias_out = nn.Parameter(torch.zeros(self.output_size, 1))
        self.reset_parameters()

        if self.nonlinearity is None:
            self.act = IdentityAct()
        elif self.nonlinearity == "tanh":
            self.act = torch.tanh
        elif self.nonlinearity == "relu":
            self.act = F.relu
        else:
            raise RuntimeError("Unknown nonlinearity: {}".format(
                self.nonlinearity))

    def reset_parameters(self):
        # add: bias-false
        stdv = 1.0 / math.sqrt(self.hidden_size)
        
        if self.initialization is None:
            for name, weight in self.named_parameters():
              if 'bias' in name:
                  if not self.use_bias:
                      weight.requires_grad = False
        elif self.initialization == 'uniform':
            for name, weight in self.named_parameters():
                if 'bias' in name:
                    if not self.use_bias:
                        weight.requires_grad = False
                    else:
                        nn.init.uniform_(weight, -stdv, stdv)
                else:
                    nn.init.uniform_(weight, -stdv, stdv)
        else:
            raise RuntimeError("Unknown initialization: {}".format(self.initialization)) 

    def forward(self, inp, hidden_in):
        inp = torch.unsqueeze(inp, 2)
        hidden_in = torch.unsqueeze(hidden_in, 2)
        if not self.ct:
            hidden_out = self.act(
                torch.matmul(self.weight_ih, inp) +
                torch.matmul( self.weight_hh, hidden_in) + self.bias)
        else:
            alpha = 1 / self.tau
            hidden_out = (1 - alpha) * hidden_in \
                         + alpha * self.act(torch.matmul(self.weight_ih, inp)
                                            + torch.matmul(self.weight_hh, hidden_in)
                                            + self.bias
                                            )

        output = torch.matmul(self.weight_ho, hidden_out) + self.bias_out
        hidden_out = torch.squeeze(hidden_out, 2)
        output = torch.squeeze(output, 2)

        return [hidden_out, output]

    def init_hidden(self, batch_s):
        return torch.zeros(batch_s, self.hidden_size).to(DEVICE)

### Define the function to get the fixed point. 

In [19]:
def get_fixed_point(config):
    dim = config.dim_in
    fixed_point = torch.randn(1, dim).to(DEVICE) * config.std_fp + config.mean_fp
    return fixed_point

def add_noise(fixed_point, mean=0, std=0.1):
    return fixed_point + torch.randn(fixed_point.size()).to(DEVICE) * std + mean

### Define the training function

In [34]:
def train_fixed_point(config, model, fixed_point):
    # initialize optimizer
    if config.optimizer_type == 'Adam':
        optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad,
                        model.parameters()),
                        lr=config.lr,
                        weight_decay=config.wdecay)
    elif config.optimizer_type == 'SGD':
        optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad,
                       model.parameters()),
                       lr=config.lr,
                       momentum=config.momentum,
                       weight_decay=config.wdecay)
    else:
        raise NotImplementedError('optimizer not implemented')

    criterion = nn.MSELoss()
    task_loss_list = []
    ep_list = []

    count = 1
    for ep in range(config.num_ep):
        hidden = model.init_hidden(config.batch_size)
        model.train()
        loss = 0.0 
        optimizer.zero_grad()
        for step in range(config.time_steps):
            
            if config.add_noise:
                fixed_point = add_noise(fixed_point)
            [hidden, output] = model(fixed_point, hidden)
            task_loss = criterion(output, fixed_point)
            loss += task_loss

        loss.backward()
        optimizer.step()
        task_loss_list.append((loss/config.time_steps).item())
        ep_list.append(count)
        if count % 10 == 0: 
            print('TRAIN | Epoch: {}/{} | Loss: {:.2f}'.format(ep+1, config.num_ep, loss/config.time_steps))
        count += 1

    return model, optimizer, task_loss_list, ep_list


## Train the network. 

In [18]:
config = LearnFixedPointConfig()
model = VanillaRNNCell(config)
fixed_point = get_fixed_point(config)
print('Fixed point {} \n'.format(fixed_point))

Fixed point tensor([[ 0.0461,  0.4024, -1.0115,  0.2167, -0.6123,  0.5036,  0.2310,  0.6931,
         -0.2669,  2.1785]]) 



In [36]:
model, optimizer, task_loss_list, ep_list = train_fixed_point(config, model, fixed_point)

TRAIN | Epoch: 10/100 | Loss: 0.44
TRAIN | Epoch: 20/100 | Loss: 0.43
TRAIN | Epoch: 30/100 | Loss: 0.41
TRAIN | Epoch: 40/100 | Loss: 0.39
TRAIN | Epoch: 50/100 | Loss: 0.38
TRAIN | Epoch: 60/100 | Loss: 0.36
TRAIN | Epoch: 70/100 | Loss: 0.35
TRAIN | Epoch: 80/100 | Loss: 0.33
TRAIN | Epoch: 90/100 | Loss: 0.32
TRAIN | Epoch: 100/100 | Loss: 0.31
