<a href="https://colab.research.google.com/github/trogdentyler/cs611/blob/main/hmwk/hw12_prob5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip3 install torch
!pip3 install tqdm

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms, utils, datasets
from tqdm import tqdm

In [3]:
# create mnist dataset object

class MNISTProcessedDataset(Dataset):
  def __init__(self, root, train=True):
    # download mnist data from torchvision
    
    self.data = datasets.MNIST(root, train=train, transform=transforms.ToTensor(), download=True)

  def __getitem__(self, i):
    x, y = self.data[i]

    return x, y

  def __len__(self):
    return len(self.data)

In [4]:
# create CNN architecture

class CustomCNN(nn.Module):
  def __init__(self):
    super(CustomCNN, self).__init__()
    
    # number of classes we're fitting to
    out_dim = 10

    ############################
    # Assign the number of input
    # and output channels along
    # with the kernel and padding
    # sizes for each convolutional
    # layer.
    #
    # Also assign the in feature size
    # for your final fully connected
    # layer.
    #
    # Note: your next choices are
    # dependent upon your previous
    # ones.

    out1 = #
    k_size1 = #
    padd1 = #
    max_pool_size1 = #

    in2 = #
    out2 = #
    k_size2 = #
    padd2 = #
    max_pool_size2 = #

    in3 = #
    out3 = #
    k_size3 = #
    padd3 = #
    max_pool_size3 = #

    in_feat = #

    ############################

    self.conv1 = nn.Sequential(         
            nn.Conv2d(in_channels=1, out_channels=out1, kernel_size=k_size1, padding=padd1),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=max_pool_size1),    
        )
    
    self.conv2 = nn.Sequential(         
            nn.Conv2d(in_channels=in2, out_channels=out2, kernel_size=k_size2, padding=padd2),     
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=max_pool_size2),                
        )
    
    self.conv3 = nn.Sequential(         
            nn.Conv2d(in_channels=in3, out_channels=out3, kernel_size=k_size3, padding=padd3),     
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=max_pool_size3),                
        )
    
    # fully connected layer, output 10 classes
    self.out = nn.Linear(in_features=in_feat, out_features=out_dim)

  def forward(self, x):
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.conv3(x)

    # flatten the output of conv3
    x = x.view(x.size(0), -1)       
    output = self.out(x)

    return output

In [None]:
# define train and val datasets
train_data = MNISTProcessedDataset('/tmp/mnist')
val_data = MNISTProcessedDataset('/tmp/mnist', train=False)

# define dataloaders which will shuffle our data and hand us a batch when prompted
batch_size = 42
train_loader = DataLoader(train_data, batch_size=batch_size, pin_memory=True)
val_loader = DataLoader(val_data, batch_size=batch_size)

# instantiate model
model = CustomCNN()
model.cuda() # use the GPU for calculation with the model

# instantiate optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# instantiate loss
objective = nn.CrossEntropyLoss()

In [None]:
# Run your training/validation loops
train_losses = []
validation_losses = []

# play with number of epochs if training loss is too high
num_epochs = 5

# loop status bar
loop = tqdm(total=len(train_loader) * num_epochs, position=0)

for epoch in range(num_epochs):
  # train
  batch = 0

  for x, y_truth in train_loader:
    # send tensors to GPU
    x, y_truth = x.cuda(non_blocking=True), y_truth.cuda(non_blocking=True)

    optimizer.zero_grad()

    # evaluate model on input and get loss
    y_hat = model(x)
    loss = objective(y_hat, y_truth)

    # here we'll run through our validation/test data
    if batch % 25 == 0:
      train_losses.append(loss.item())
      validation_loss_list = []

      for val_x, val_y_truth in val_loader:
        val_x, val_y_truth = val_x.cuda(non_blocking=True), val_y_truth.cuda(non_blocking=True)

        val_y_hat = model(val_x)
        validation_loss_list.append(objective(val_y_hat, val_y_truth))

      validation_losses.append( (sum(validation_loss_list) / len(validation_loss_list) ).item() )

    loop.set_description('epoch:{} batch:{} loss:{:.4f} val_loss:{:.4f}'.format(epoch, batch, loss.item(), validation_losses[-1]))
    loop.update()

    loss.backward()
    optimizer.step()

    batch += 1

loop.close()

# create a plot of your loss over time
fig = plt.figure()
ax = fig.add_subplot(111)

ax.plot(range(len(validation_losses)), validation_losses, label="Validation loss")
ax.plot(range(len(train_losses)), train_losses, label="Training loss")

plt.xlabel("Time (Iterations)")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
# test prediction against actual labels
imgs, lbls = next(iter(val_loader))
imgs = imgs.cuda()

test_output = model(imgs)
pred_lbls = torch.max(test_output, 1)[1].data.cpu().numpy().squeeze()

actual_lbls = lbls.numpy()

print(f'Prediction number: {pred_lbls}')
print(f'Actual number: {actual_lbls}')
print('\n', pred_lbls == actual_lbls)