In [2]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
import torch.nn as nn
import torch.nn.functional as F # import convolution functions like Relu
from torch.optim.lr_scheduler import StepLR

In [3]:
# the transformers for the training and testing data
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224), # resize the image to a random scale + aspect ratio and crop to 224x224 (helps model generalise better)
        transforms.RandomHorizontalFlip(), # randomly flip the image horizontally (50/50 chance) 
        transforms.ToTensor(), # convert image from PIL to tensor
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # normalise the image with mean and standard deviation
    ]),
    'test': transforms.Compose([
        transforms.Resize(256), # resize the image to 256x256
        transforms.CenterCrop(224), # crop the image to 224x224 (center)
        transforms.ToTensor(), # convert image from PIL to tensor
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # normalise the image with mean and standard deviation
    ]),
    'val': transforms.Compose([
        transforms.Resize(256), # resize the image to 256x256
        transforms.CenterCrop(224), # crop the image to 224x224 (center)
        transforms.ToTensor(), # convert image from PIL to tensor
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # normalise the image with mean and standard deviation
    ])
}
# load dataset flowers102
image_datasets = {
    x: datasets.Flowers102(
        root='data',
        split=x,
        download=True,
        transform=data_transforms[x]
    )
    for x in ['train', 'test', 'val']
}


In [4]:
# create dataloaders for the training and testing data
dataloaders = {
    x: DataLoader(
        image_datasets[x], # create a dataloader for the training and testing data
        batch_size=4, # process 60 images at a time
        shuffle=True, # shuffle the images to prevent any patterns being learnt unintentionally
        num_workers=4 # use 4 subprocesses to load the data
    )
    for x in ['train', 'test', 'val']
}

In [8]:
class CNN(nn.Module):
    def __init__(self, num_channels=3, num_out_ch=[32, 64, 128], img_w=100, img_h=100, num_classes=102):
        super(CNN, self).__init__()
        # our first conv layer will have 32 output channels, a kernel size of 3, a stride of 1, and a padding of 1
        self.conv1 = nn.Conv2d(num_channels, num_out_ch[0], kernel_size=(3,3), stride=(1,1), padding=(1,1))
        # batch normalization layer with 32 channels (same as output of conv layer)
        self.bn1 = nn.BatchNorm2d(num_out_ch[0])
        # our second conv layer will have 64 output channels, a kernel size of 3, a stride of 1, and a padding of 1
        self.conv2 = nn.Conv2d(num_out_ch[0], num_out_ch[1], kernel_size=(3,3), stride=(1,1), padding=(1,1))
        # batch normalization layer with 64 channels (same as output of conv layer)
        self.bn2 = nn.BatchNorm2d(num_out_ch[1])
        # our third conv layer will have 128 output channels, a kernel size of 3, a stride of 1, and a padding of 1
        self.conv3 = nn.Conv2d(num_out_ch[1], num_out_ch[2], kernel_size=(3,3), stride=(1,1), padding=(1,1))
        # batch normalization layer with 128 channels (same as output of conv layer)
        self.bn3 = nn.BatchNorm2d(num_out_ch[2])
        # max pooling layer with kernel size 2 and stride 2
        self.pool = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        # fully connected layer that transforms the output of the conv layers into num_classes
        self.fc = nn.Linear(int(img_w/8)*int(img_h/8)*num_out_ch[2], num_classes)
    
    def forward(self, x):
        # apply first conv layer, then relu, then batch norm, then max pool
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        # apply second conv layer, then relu, then batch norm, then max pool
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        # apply third conv layer, then relu, then batch norm, then max pool
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        # flatten the output of the conv layers
        x = self.fc(x.reshape(x.shape[0], -1))
        
        return x
model = CNN()
x = torch.randn(1, 3, 100, 100)
print(model(x).shape)
y = model(x)
print(y.shape)

torch.Size([1, 102])
torch.Size([1, 102])


In [11]:
# PARAMS
NUM_CHANNELS = [8, 16, 32] # number of output channels for each conv layer
IMG_W = 200 # image width
IMG_H = 200 # image height
NUM_CLASSES = 102 # number of classes (flowers)
BATCH_SIZE = 32 # batch size (number of images to process at once)
NUM_EPOCHS = 12 # number of epochs (times to run the training loop)
LEARNING_RATE = 0.0001 # learning rate

In [9]:
def count_parameters(model)->int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(count_parameters(model))

1973862


In [10]:
criteria = nn.CrossEntropyLoss() # loss function
optimiser = torch.optim.Adam(model.parameters(), lr=0.001) # optimiser
scheduler = StepLR(optimiser, step_size=7, gamma=0.1)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # train on gpu if supported
print("USING: ", device)
print("IS CUDA AVAILABLE: ", torch.cuda.is_available())
model.to(device) # move model to gpu

USING:  cuda:0
IS CUDA AVAILABLE:  True


CNN(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (fc): Linear(in_features=18432, out_features=102, bias=True)
)

In [13]:
def check_accuracy(loader, model):
    correct = 0 
    samples = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device) # move images to device
            y = y.to(device) # move labels to device
            y_hat = model(x) # get predictions
            _, predictions = y_hat.max(1) # get predictions
            correct += (predictions == y).sum() # add number of correct predictions
            samples += predictions.size(0) # add number of samples
    print(f"Accuracy: {correct}/{samples}: %{correct/samples*100:.2f}") # print accuracy
    model.train() # set model back to train mode

for epoch in range(NUM_EPOCHS):
    running_loss = 0
    val_running_loss = 0
    for i, (x, y) in enumerate(dataloaders['train']):
        x = x.to(device)
        y = y.to(device)

        y_hat = model(x)
        loss = criteria(y_hat, y)
        running_loss += loss

        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
    
    # Calculate validation loss
    model.eval()
    with torch.no_grad():
        for x_val, y_val in dataloaders["val"]:
            x_val = x_val.to(device)
            y_val = y_val.to(device)
            
            y_val_hat = model(x_val)
            val_loss = criteria(y_val_hat, y_val)
            val_running_loss += val_loss

    model.train()

    scheduler.step()
    print(f"Epoch {epoch+1} of {NUM_EPOCHS}, Train Loss: {running_loss}, Val Loss: {val_running_loss}")
    check_accuracy(dataloaders["val"], model)

RuntimeError: mat1 and mat2 shapes cannot be multiplied (4x100352 and 18432x102)

In [14]:
import time
model.eval()
test_loss = 0.0
test_correct = 0

with torch.no_grad():
    for inputs, labels in dataloaders["test"]:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criteria(outputs, labels)

        test_loss += loss.item() * inputs.size(0)
        test_correct += torch.sum(preds == labels.data)
print(f"Test Loss: {test_loss/len(image_datasets['test']):.4f} Acc: {test_correct.double()/len(image_datasets['test']):.4f}")
print("Saving model...")
torch.save(model.state_dict(), f"./models/model_{test_correct.double()/len(image_datasets['test']):.4f}.pth")
print("Model saved!")


Test Loss: 6.6630 Acc: 0.0124
Saving model...
Model saved!
