### 0. Make Imports and Set Up Device

In [1]:
from shutil import copy
from os import listdir, makedirs
from os.path import isdir, join, splitext

import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader

import torchvision
from torchvision.datasets import ImageFolder
from torchvision import transforms as tr

In [2]:
def setup_device() -> torch.device:
    if (not torch.cuda.is_available()):
        print("No CUDA GPUs found. CPU selected as training device.")
        return torch.device("cpu")
    
    device_id = 0
    device = torch.device(f"cuda:{device_id}")
    count = torch.cuda.device_count()
    name = torch.cuda.get_device_name(device_id)
    capability = torch.cuda.get_device_capability(device_id)
    print(f"{count} CUDA GPUs available. Using {name} with CUDA {capability[0]}.{capability[1]} capability.")
    return device


device = setup_device()

1 CUDA GPUs available. Using GeForce RTX 2080 SUPER with CUDA 7.5 capability.


### 1. Divide the Dataset into Categories
The used dataset doesn't contain any explicit category labels. However, the source images are sorted by category, with each of the 17 categories having exactly 80 images. Therefore we can label the images by simply counting them. The code assumes the raw images are placed into _data/jpg_ directory. It splits the data into training, validation and testing subsets, then creates a subdirectory for each subset in _data_ dir. In each of those, another set of subdirs is created -- one for each category, named after the category index (0 through 16) and flower images from that category are copied inside. The number of copied files for each subset is defined in the _subset_splits_ dict.

In [3]:
def is_file_jpg(file_path: str) -> bool:
    path_root, extension = splitext(file_path)
    return extension.lower() == ".jpg"

In [4]:
raw_data_path = "data/jpg"
category_count = 17
images_per_category = 80

# How many images in each category should fall into a data subset
subset_splits = {"training": 56, "validation": 16, "testing": 8}
assert sum(subset_splits.values()) == images_per_category

image_list = [file for file in listdir(raw_data_path) if is_file_jpg(file)]
assert len(image_list) == category_count * images_per_category
image_list[:10]

['image_0001.jpg',
 'image_0002.jpg',
 'image_0003.jpg',
 'image_0004.jpg',
 'image_0005.jpg',
 'image_0006.jpg',
 'image_0007.jpg',
 'image_0008.jpg',
 'image_0009.jpg',
 'image_0010.jpg']

In [5]:
def assign_images_to_categories(category_count: int, images_per_category: int, images: list) -> dict:
    categories = {}
    for category_index in range(category_count):
        first_image_in_category = category_index * images_per_category
        last_image_in_category = first_image_in_category + images_per_category
        categories[category_index] = images[first_image_in_category:last_image_in_category]
    return categories
        

category_dict = assign_images_to_categories(category_count, images_per_category, image_list)

assert len(category_dict.keys()) == category_count
assert len(category_dict[category_count - 1]) == images_per_category

In [6]:
def split_data_into_subsets(category_dict: dict, subset_splits: dict) -> (dict, dict, dict):
    train, valid, test = {}, {}, {}
    first_validation_image = subset_splits["training"]
    first_testing_image = first_validation_image + subset_splits["validation"]
    
    for cat_index, cat_images  in category_dict.items():
        train[cat_index] = cat_images[:first_validation_image]
        valid[cat_index] = cat_images[first_validation_image:first_testing_image]
        test[cat_index] = cat_images[first_testing_image:]
    
    return train, valid, test
    
    
training_images, validation_images, testing_images = split_data_into_subsets(category_dict, subset_splits)

assert len(training_images.keys()) == len(validation_images.keys()) == len(testing_images.keys()) == category_count
assert len(training_images[10]) == subset_splits["training"]
assert len(validation_images[4]) == subset_splits["validation"]
assert len(testing_images[16]) == subset_splits["testing"]

In [7]:
def create_directory(path: str) -> None:
    if not isdir(path):
        makedirs(path)
        

def create_subset_data_directories(subset_dict: dict, subset_name: str, raw_data_path: str) -> None:
    subset_dir_path = join("data", subset_name)
    create_directory(subset_dir_path)
    
    for category_index, category_images in subset_dict.items():
        category_path = join(subset_dir_path, str(category_index))
        create_directory(category_path)
        
        for image in category_images:
            source_path = join(raw_data_path, image)
            destination = join(category_path, image)
            copy(source_path, destination)
        

create_subset_data_directories(training_images, "training", raw_data_path)
create_subset_data_directories(validation_images, "validation", raw_data_path)
create_subset_data_directories(testing_images, "testing", raw_data_path)

### 2. Pre-process Data and Load into Data Loaders

In [8]:
'''
The network that will be used in transfer learning has been pre-trained using normalized data. Therefore the same
transformation must be performed for new data, for the training to be effective. Below are the values used for original
normalization.
'''
normalization_means = [0.485, 0.456, 0.406]
normalization_stds = [0.229, 0.224, 0.225]
final_image_size = 224

In [9]:
'''
These transformations should help the network to learn translation, rotation and size invariance, to reduce over-training. 
Additionally they normalize the input data to make it more statistically similar to the data that the network was 
pre-trained on.
'''
training_transforms = [tr.RandomRotation(degrees=10, expand=True),
                       tr.RandomResizedCrop(size=final_image_size, scale=[0.75, 1.0]),
                       tr.ToTensor(),
                       tr.Normalize(mean=normalization_means, std=normalization_stds)]

testing_transforms = [tr.Resize(size=final_image_size + 8),
                      tr.CenterCrop(size=final_image_size),
                      tr.ToTensor(),
                      tr.Normalize(mean=normalization_means, std=normalization_stds)]

In [10]:
batch_size = 64


def make_data_loader(data_path: str, transforms: list, batch_size: int) -> DataLoader:
    image_transformation = tr.Compose(transforms)
    data_set = ImageFolder(root=data_path, transform=image_transformation)
    should_pin_memory = torch.cuda.is_available()
    loader = DataLoader(dataset=data_set, shuffle=True, pin_memory=should_pin_memory, batch_size=batch_size)
    return loader
    
    
training_loader = make_data_loader("data/training", training_transforms, batch_size)
validation_loader = make_data_loader("data/validation", training_transforms, batch_size)
testing_loader = make_data_loader("data/testing", testing_transforms, batch_size)

### 3. Build the Network

In [11]:
'''
According to comparisons (such as this one https://learnopencv.com/wp-content/uploads/2019/06/Pre-Trained-Model-Comparison.png),
ResNet50 seems to offer good balance between training time and accuracy.
'''
pre_trained_model = torchvision.models.resnet50(pretrained=True)
pre_trained_model

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [12]:
pre_trained_feature_count = pre_trained_model.fc.in_features
pre_trained_feature_count

2048

In [13]:
'''
This custom classifier will be used to replace the original ResNet's classifier layers. This way, we can adapt the model 
to classify our 17 flower categories using its pre-trained feature extraction layers. It will have three layers, as this
is often sufficient for good accuracy.
'''
class CustomClassifier(nn.Module):
    def __init__(self, 
                 input_features: int, 
                 hidden1_size: int, 
                 hidden2_size: int, 
                 output_categories: int,
                 dropout: float) -> None:
        
        super().__init__()
        self.hidden1 = nn.Linear(input_features, hidden1_size)
        self.hidden2 = nn.Linear(hidden1_size, hidden2_size)
        self.output = nn.Linear(hidden2_size, output_categories)
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, input_tensor: torch.Tensor) -> torch.Tensor:
        x = self.dropout(nn.functional.relu(self.hidden1(input_tensor)))
        x = self.dropout(nn.functional.relu(self.hidden2(x)))
        raw_category_scores = self.output(x)
        return raw_category_scores

In [14]:
def disable_feature_detector_training(network: nn.Module) -> None:
    for parameter in network.parameters():
        parameter.requires_grad = False
        
        
def prepare_network_for_transfer_learning(network: nn.Module) -> nn.Module:
    disable_feature_detector_training(network)
    custom_classifier = CustomClassifier(pre_trained_feature_count, 1024, 512, category_count, dropout=0.05)
    network.fc = custom_classifier
    network = network.to(device)
    return network
    
    
network = prepare_network_for_transfer_learning(pre_trained_model)

### 4. Train the Network

In [15]:
def run_training_step(network: nn.Module, training_loader: DataLoader, device: torch.device, optimizer, criterion) -> float:
    network.train()
    avg_loss = 0
    
    for images, labels in training_loader:
        images = images.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        raw_output = network.forward(images)
        loss = criterion(input=raw_output, target=labels)
        loss.backward()
        optimizer.step()
        avg_loss += loss.item()
        
    avg_loss /= len(training_loader)
    return avg_loss

In [16]:
def calculate_accuracy(raw_output: torch.Tensor, labels: torch.Tensor) -> float:
    class_probabilities = nn.functional.softmax(raw_output, dim=1)
    predicted_classes = torch.topk(input=class_probabilities, k=1, dim=1)[1]
    resized_labels = labels.view(predicted_classes.shape[0], -1)
    prediction_matches = predicted_classes == resized_labels
    batch_average_accuracy = torch.mean(prediction_matches.type(torch.FloatTensor))
    return batch_average_accuracy


def run_evaluation_step(network: nn.Module, loader: DataLoader, device: torch.device, criterion) -> (float, float):
    network.eval()
    avg_loss = 0
    avg_accuracy = 0
    
    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            labels = labels.to(device)
            
            raw_output = network.forward(images)
            loss = criterion(input=raw_output, target=labels)
            accuracy = calculate_accuracy(raw_output, labels)
            
            avg_loss += loss
            avg_accuracy += accuracy
    
    avg_loss /= len(loader)
    avg_accuracy /= len(loader)
    return avg_loss, avg_accuracy

In [17]:
def train_network(network: nn.Module, training_loader: DataLoader, validation_loader: DataLoader, device: torch.device) -> None:
    
    # Hyperparameters
    learn_rate = 0.0003
    optimizer = optim.Adam(params=network.fc.parameters(), lr=learn_rate)
    criterion = nn.CrossEntropyLoss()
    epoch_count = 10
    
    for epoch in range(epoch_count):
        print(f"Epoch {epoch+1} / {epoch_count}")
        train_loss = run_training_step(network, training_loader, device, optimizer, criterion)
        print(f"Training Loss: {train_loss}")
        valid_loss, valid_accuracy =  run_evaluation_step(network, validation_loader, device, criterion)
        print(f"Validation Loss: {valid_loss}")
        print(f"Validation Accuracy: {valid_accuracy * 100:.2f}%\n")
        
    test_loss, test_accuracy =  run_evaluation_step(network, testing_loader, device, criterion)
    print("\nTest Results")
    print(f"Test Loss: {test_loss}")
    print(f"Test Accuracy: {test_accuracy * 100:.2f}%\n")

In [18]:
train_network(network, training_loader, validation_loader, device)

Epoch 1 / 10
Training Loss: 2.488348197937012
Validation Loss: 2.0703892707824707
Validation Accuracy: 55.94%

Epoch 2 / 10
Training Loss: 1.4780928532282511
Validation Loss: 1.2181166410446167
Validation Accuracy: 72.50%

Epoch 3 / 10
Training Loss: 0.7229420105616252
Validation Loss: 0.790314793586731
Validation Accuracy: 81.88%

Epoch 4 / 10
Training Loss: 0.4000472366809845
Validation Loss: 0.576329231262207
Validation Accuracy: 86.56%

Epoch 5 / 10
Training Loss: 0.2668887515862783
Validation Loss: 0.5161110758781433
Validation Accuracy: 85.94%

Epoch 6 / 10
Training Loss: 0.17539071142673493
Validation Loss: 0.5010616183280945
Validation Accuracy: 84.69%

Epoch 7 / 10
Training Loss: 0.17492026885350545
Validation Loss: 0.48568567633628845
Validation Accuracy: 85.00%

Epoch 8 / 10
Training Loss: 0.12750202069679897
Validation Loss: 0.4384729564189911
Validation Accuracy: 87.50%

Epoch 9 / 10
Training Loss: 0.09021254802743593
Validation Loss: 0.40239444375038147
Validation Accurac