# The Convolutional Neural Network

## Setting up the data loaders

In [16]:
# Setup
import random

import numpy as np
import torch
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from tqdm import trange
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor
import pandas as pd
import h5py

%matplotlib inline
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Set random seed for reproducibility
seed = 1234
# cuDNN uses nondeterministic algorithms, set some options for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(seed)

2.0.0+cu117


<torch._C.Generator at 0x7f5714dfcab0>

This next cell was modified from the pytorch official docs
https://pytorch.org/tutorials/beginner/basics/data_tutorial.html

In [2]:


class H5ImageDataset(Dataset):
    def __init__(self, data_file, train=True, transform=None, target_transform=None):
        self.file = h5py.File(data_file)
        self.labels = self.file.get("ans")
        self.imgs = self.file.get("images")
        self.train = train
        if train:
            self.index_transform = self.train_index
        else:
            self.index_transform = self.test_index
        self.transform = transform
        self.target_transform = target_transform

    def train_index(self, idx):
        """Here I had to use some clever math to go from a training set index to an overall set index. It skips every 5th number, starting w/ 0"""
        return (idx // 4) * 5 + (idx % 4) + 1
    
    def test_index(self, idx):
        """This is how I got the test indices. Every 5th index in the overall dataset is a testing datapoint, so we get the 5th one."""
        return idx * 5
        
    def __len__(self):
        if self.train: 
            return len(self.labels) // 5 * 4
        return len(self.labels) // 5
                   
    def __getitem__(self, idx):
        idx = self.index_transform(idx)
        image = self.imgs[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label
    
    def get_mean(self):
        return np.mean(self.imgs)
    
    def get_std(self):
        return np.std(self.imgs)
    
class NPYImageDataset(Dataset):
    def __init__(self, imgs_file, labels_file, train=True, transform=None, target_transform=None):
        self.labels = np.load(labels_file)
        self.imgs = np.load(imgs_file)
        self.train = train
        if train:
            self.index_transform = self.train_index
        else:
            self.index_transform = self.test_index
        self.transform = transform
        self.target_transform = target_transform

    def train_index(self, idx):
        """Here I had to use some clever math to go from a training set index to an overall set index. It skips every 5th number, starting w/ 0"""
        return (idx // 4) * 5 + (idx % 4) + 1
    
    def test_index(self, idx):
        """This is how I got the test indices. Every 5th index in the overall dataset is a testing datapoint, so we get the 5th one."""
        return idx * 5
        
    def __len__(self):
        if self.train: 
            return len(self.labels) // 5 * 4
        return len(self.labels) // 5
                   
    def __getitem__(self, idx):
        idx = self.index_transform(idx)
        image = self.imgs[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label
    
    def get_mean(self):
        return np.mean(self.imgs)
    
    def get_std(self):
        return np.std(self.imgs)
    

In [3]:
train_data = NPYImageDataset("sample_imgs_large.npy", "sample_labels_large.npy", train=True)
test_data  = NPYImageDataset("sample_imgs_large.npy", "sample_labels_large.npy", train=False)
print(len(train_data))
batch_size = 1
seed = 123
torch.manual_seed(seed)

404


<torch._C.Generator at 0x7f5714dfcab0>

In [4]:
#Set the transforms
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=(0., 0., 0.), std=(0.2, 0.2, 0.2))
])

train_data.transform = transform
test_data.transform = transform


In [5]:
#get the dataloader objects

train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True,  num_workers=True) 
test_loader  = torch.utils.data.DataLoader(test_data,  batch_size=batch_size, shuffle=False, num_workers=True)

## Setting up the CNN

In [6]:
from torch.nn import Conv2d
from torch.nn import ReLU
from torch.nn import MaxPool2d
from torch.nn import Flatten
from torch.nn import Linear
from torch.nn import LogSoftmax

class CNNClassifier(torch.nn.Module):
    
    def __init__(self):
        super().__init__()
        self.conv1 = Conv2d(in_channels=3, out_channels=10,
            kernel_size=(3, 3))
        self.relu1 = ReLU()
        self.maxpool1 = MaxPool2d(kernel_size=(3, 3), stride=(3, 3))
        
        self.conv2 = Conv2d(in_channels=10, out_channels=20,
            kernel_size=(2, 2))
        self.relu2 = ReLU()
        self.maxpool2 = MaxPool2d(kernel_size=(3, 3), stride=(3, 3))
        
        self.conv3 = Conv2d(in_channels=20, out_channels=50,
            kernel_size=(3, 3))
        self.relu3 = ReLU()
        self.maxpool3 = MaxPool2d(kernel_size=(5, 5), stride=(5, 5))
        
        self.flatten = Flatten()
        self.fc1 = Linear(in_features=1250, out_features=50)
        self.relu3 = ReLU()
        
        self.fc2 = Linear(in_features=50, out_features=10)
        self.logSoftmax = LogSoftmax(dim=1)
    
    def forward(self, x):
        # ------------------
        # Write your implementation here.        

        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.maxpool3(x)  
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        x = self.logSoftmax(x)
        
        y_output = x
        
        return y_output
        # ------------------

model = CNNClassifier().to(DEVICE)

# sanity check
print(model)

CNNClassifier(
  (conv1): Conv2d(3, 10, kernel_size=(3, 3), stride=(1, 1))
  (relu1): ReLU()
  (maxpool1): MaxPool2d(kernel_size=(3, 3), stride=(3, 3), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(10, 20, kernel_size=(2, 2), stride=(1, 1))
  (relu2): ReLU()
  (maxpool2): MaxPool2d(kernel_size=(3, 3), stride=(3, 3), padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(20, 50, kernel_size=(3, 3), stride=(1, 1))
  (relu3): ReLU()
  (maxpool3): MaxPool2d(kernel_size=(5, 5), stride=(5, 5), padding=0, dilation=1, ceil_mode=False)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=1250, out_features=50, bias=True)
  (fc2): Linear(in_features=50, out_features=10, bias=True)
  (logSoftmax): LogSoftmax(dim=1)
)


## Training the CNN

In [7]:
#Function to train one epoch
def train_one_epoch(train_loader, model, device, optimizer, log_interval, epoch):
    model.train()
    losses = []
    counter = []
    
    for i, (img, label) in enumerate(train_loader):
        img, label = img.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(img)
        #print(output, output.shape)
        loss = torch.nn.functional.nll_loss(output, label)
        loss.backward()
        optimizer.step()
    
        # Record training loss every log_interval and keep counter of total training images seen
        if (i+1) % log_interval == 0:
            losses.append(loss.item())
            counter.append(
                (i * batch_size) + img.size(0) + epoch * len(train_loader.dataset))

    return losses, counter

In [8]:
#Function to test one epoch
def test_one_epoch(test_loader, model, device):
    model.eval()
    test_loss = 0
    num_correct = 0
    
    with torch.no_grad():
        for i, (img, label) in enumerate(test_loader):
            img, label = img.to(device), label.to(device)

            # ------------------
            # Write your implementation here.
            
            output = model(img)
            pred = output.max(1)[1] # Get index of largest log-probability and use that as prediction
            num_correct += pred.eq(label).sum().item()
            test_loss /= len(test_loader)
            # ------------------
            
    test_loss /= len(test_loader.dataset)
    return test_loss, num_correct

In [15]:
# Hyperparameters
lr = 0.01
max_epochs=10

# Recording data
log_interval = 100

# Instantiate optimizer (model was created in previous cell)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

train_losses = []
train_counter = []
test_losses = []
test_correct = []
for epoch in trange(max_epochs, leave=True, desc='Epochs'):
    train_loss, counter = train_one_epoch(train_loader, model, DEVICE, optimizer, log_interval, epoch)
    test_loss, num_correct = test_one_epoch(test_loader, model, DEVICE)

    # Record results
    train_losses.extend(train_loss)
    train_counter.extend(counter)
    test_losses.append(test_loss)
    test_correct.append(num_correct)

print(f"Test accuracy: {test_correct[-1]/len(test_loader.dataset)}")

Epochs: 100%|███████████████████████████████████| 10/10 [00:32<00:00,  3.29s/it]

Test accuracy: 0.1485148514851485





In [10]:
torch.save(model.state_dict(), "model_output.txt")