In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms

if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU is not available")
dataset_root = "sample_data"
model_path = "part2-model.pth"
model = None

GPU is available



# Part 2 - Custom Classification

In [None]:
# Adapted from https://github.com/mcmerdith/cisc484/blob/hw4/HW%204_CNN-1.ipynb

class VGG16_Block(nn.Module):
    def __init__(self, input_channels, out_channels, rate=0.3, drop=True):
        super().__init__()
        self.conv = nn.Conv2d(input_channels, out_channels, 3, 1, 1)  #This line creates a 2D convolutional layer using PyTorch's nn.Conv2d module.
        # self.bn = nn.BatchNorm2d(out_channels) # normalize the activations of the convolutional layer in the neural network.
        self.relu = nn.ReLU(inplace=True) #introduces non-linearity (-ve --> zeros)
        self.dropout = nn.Dropout(rate)
        self.drop = drop

    def forward(self, x):  #This method defines the forward pass of the convolutional block.
        # x = self.relu(self.bn(self.conv(x)))
        x = self.relu(self.conv(x))
        if self.drop:
            x = self.dropout(x)
        return x
    
def vgg16_layer(input_channels, out_channels, num, dropout=0.3, pool=True):
    layers = []
    for i in range(num):
        layers.append(VGG16_Block(input_channels, out_channels, dropout, drop=False))
        input_channels = out_channels
    if pool:
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
    return nn.Sequential(*layers)

class ModifiedVGG16(nn.Module):
    def __init__(self, n_classes: int):
        super(ModifiedVGG16, self).__init__()
        self.features = nn.Sequential(
            vgg16_layer(3, 64, 1), # 224x224 -> 112x112
            vgg16_layer(64, 128, 1), #112x112 -> 56x56
            vgg16_layer(128, 256, 1), # 56x56 -> 28x28
            vgg16_layer(256, 256, 1), # 28x28 -> 14x14
            vgg16_layer(256, 256, 1), # 14x14 -> 7x7
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256*7*7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, n_classes),
            nn.Flatten(),
        )

    # defines how data flows through the network during the forward pass.
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x
    
def make_model(n_classes=100):
    return ModifiedVGG16(n_classes).to(device)

# setup transformers
data_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    # transforms.Resize((112,112)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# Training

In [None]:
# load the training set
batch_size = 64
trainset = datasets.CIFAR100(dataset_root, train=True, transform = data_transforms, download=True)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True,
                                           num_workers=2, pin_memory=True, persistent_workers=True)

# build the model
model = make_model()

# training time
num_epochs = 30
criterion = nn.CrossEntropyLoss()
# higher learning rate to train in less epochs
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# save the best model
best_epoch = 0
best_acc = 0.0
best_weights = None

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    
    curr_loss = 0.0
    curr_correct = 0.0
    for images, labels in train_loader:            
        images, labels = images.to(device), labels.to(device)
        # reset gradients
        model.train()
        optimizer.zero_grad()
        # calculate loss and backprop
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        
        optimizer.step()

        # score the model
        _, preds = torch.max(outputs, 1)
        curr_loss += loss.item() * images.size(0)
        curr_correct += torch.sum(preds == labels)

    # finished with this epoch
    scheduler.step()
    epoch_loss = curr_loss / len(trainset)
    epoch_acc = curr_correct / len(trainset)
    print(f"    train loss: {epoch_loss} acc: {epoch_acc}")

    # store if this is the best iteration
    if epoch_acc > best_acc or not best_weights:
        best_epoch = epoch
        best_acc = epoch_acc
        best_weights = model.state_dict()

# force types
assert best_weights is not None, "no iterations were successful???"

print(f"Best iteration: {best_epoch + 1}, acc: {best_acc}")

# reset to the best iteration
model.load_state_dict(best_weights)

torch.save(best_weights, model_path)
print(f"Saved model to {model_path}")

Epoch 1/10


KeyboardInterrupt: 

# Testing

In [5]:
# if the model wasn't just trained, create a new one
if not model:
    model = make_model()

model.load_state_dict(torch.load(model_path))

<All keys matched successfully>

In [6]:
# load the test set
batch_size = 64
testset = datasets.CIFAR100(dataset_root, train=False, transform = data_transforms, download=True)
test_loader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)

correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)

        # score the model
        _, preds = torch.max(outputs, 1)
        correct += torch.sum(preds == labels.data)



print(f"Test accuracy: {correct / len(testset)}")

Test accuracy: 0.5536999702453613


# Debugging

In [48]:
from torchsummary import summary
model = make_model()
summary(model, (3, 224, 224))

RuntimeError: mat1 and mat2 shapes cannot be multiplied (2x12544 and 25088x4096)