# **MNIST_RNN**
- ELEC 576 HW 2
- Robert Heeter
- 25 October 2023

## **Structure**:
- Adjustments:
    - Number of nodes in the hidden layer
    - Learning rate
    - Number of iterations
    - Optimizer
1) Set PyTorch metadata
    - Seed
    - TensorFlow output
    - Whether to transfer to gpu (cuda)
2) Import data
    - Download data
    - Create data loaders with batchsize, transforms, scaling
3) Define model architecture, loss, and optimizer
4) Define test and training loops
    - Train:
        - Get next batch
        - Forward pass through model-
        - Calculate loss
        - Backward pass from loss (calculates the gradient for each parameter)
        - Optimizer: performs weight updates
5) Perform training over multiple epochs
    - Each epoch:
        - Call train loop
        - Call test loop

## **Acknowledgements**:
- Worked with Arielle Sanford


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader
import numpy as np

from torch.utils.tensorboard import SummaryWriter
from datetime import datetime
import os

import matplotlib.pyplot as plt

%load_ext tensorboard


In [None]:
# 1. Set PyTorch metadata

batch_size = 64
test_batch_size = 1000
epochs = 10
lr = 0.001
try_cuda = True
seed = 1000
logging_interval = 10 # how many batches to wait before logging
logging_dir = None

# setting up the logging
log_dir = os.path.join(os.getcwd(),'log/MNIST', datetime.now().strftime('%b%d_%H-%M-%S'))
writer = SummaryWriter(log_dir=log_dir)

# deciding whether to send to the cpu or not if available
if torch.cuda.is_available() and try_cuda:
    cuda = True
    torch.cuda.mnaual_seed(seed)
else:
    cuda = False
    torch.manual_seed(seed)
    

In [None]:
# 2. Import data

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])

train_dataset = datasets.MNIST('data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST('data', train=False, transform=transform, download=True)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=test_batch_size, shuffle=False)

# example of data
print(train_dataset.train_data.size())
print(train_dataset.train_labels.size())

plt.imshow(train_dataset.train_data[0].numpy(), cmap='gray')
plt.title('%i' % train_dataset.train_labels[0])
plt.show()


In [None]:
# 3. Defining model architecture, loss, and optimizer

class Net(nn.Module):
    def __init__(self, model_type, hidden_size):
        super(Net, self).__init__()
        
        self.model_type = model_type
        self.hidden_size = hidden_size

        if self.model_type == 'RNN':
            # define RNN layer
            self.network = nn.RNN(
                input_size=28, # input size (number of features in each time step)
                hidden_size=self.hidden_size, # number of hidden units
                num_layers=2, # number of RNN layers
                batch_first=True # input and output tensors are (batch, time_step, input_size)
            )
            
        elif self.model_type == 'GRU':
            # define GRU layer
            self.network = nn.GRU(
                input_size=28, # input size (number of features in each time step)
                hidden_size=self.hidden_size, # number of hidden units
                num_layers=2, # number of RNN layers
                batch_first=True # input and output tensors are (batch, time_step, input_size)
            )

        elif self.model_type == 'LSTM':
            # define LSTM layer
            self.network = nn.LSTM(
                input_size=28, # input size (number of features in each time step)
                hidden_size=self.hidden_size, # number of hidden units
                num_layers=2, # number of RNN layers
                batch_first=True # input and output tensors are (batch, time_step, input_size)
            )

        # define output layer
        self.out = nn.Linear(self.hidden_size, 10) # output size = 10

    def forward(self, x):
        n_out, hidden = self.network(x, None)
        return self.out(n_out[:, -1, :])

model = Net(model_type='LSTM', hidden_size=30)
print(model)

optimizer = optim.Adam(model.parameters(), lr=lr)
# optimizer = optim.SGD(model.parameters(), lr=lr)
print(optimizer)


In [None]:
# 4. Define test and training loops

def train(epoch):
    model.train()

    criterion = nn.CrossEntropyLoss()
    total_loss = 0
    correct = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        if cuda:
            data, target = data.cuda(), target.cuda()

        data = data.view(-1, 28, 28)

        optimizer.zero_grad()
        output = model(data) # forward pass
        loss = criterion(output, target)
        loss.backward() # backward pass
        optimizer.step()
        total_loss += loss.item()

        # calculate training accuracy
        _, pred = output.max(1)
        correct += pred.eq(target).sum().item()
   
        if batch_idx % logging_interval == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item()}')
    
    # calculate and log training metrics
    train_loss = total_loss / len(train_loader.dataset)
    train_accuracy = 100. * correct / len(train_loader.dataset)
    print(f"Epoch {epoch}: Training Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.2f}%")
    
    # add to TensorBoard
    writer.add_scalar('Loss/Train', train_loss, epoch)
    writer.add_scalar('Accuracy/Train', train_accuracy, epoch)

def test(epoch):
    model.eval()
    test_loss = 0
    correct = 0
    
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        for data, target in test_loader:
            if cuda:
                data, target = data.cuda(), target.cuda()

            data = data.view(-1, 28, 28)
            output = model(data)

            test_loss += criterion(output, target,).item() # sum up batch loss (later, averaged over all test samples)
            pred = output.data.max(1, keepdim=True)[1] # get index of max
            correct += pred.eq(target.data.view_as(pred)).cpu().sum()

    # calculate and log testing metrics
    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    print(f"Test Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.2f}%")
    
    # add to TensorBoard
    writer.add_scalar('Accuracy/Test', test_accuracy, epoch)
    

In [None]:
# 5. Perform training over multiple epochs

# start training
for epoch in range(1, epochs + 1):
    train(epoch)
    test(epoch)

writer.close()


In [None]:
%tensorboard --logdir log/MNIST --port=8008
