Import statements

In [2]:
import torch
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
from torchvision.datasets import ImageFolder

In [3]:
# Since ResNet50 was trained on ImageNet, should resize and normalize accordingly
# Described in "Deep Residual Learning for Image Recognition" https://arxiv.org/abs/1512.03385

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val_test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

In [4]:
from torch.utils.data import Subset

# Load dataset
data_dir = '../data/raw-img/' 
datasets = ImageFolder(root=data_dir) # TODO Labels are in Italian and should be translated
class_names = datasets.classes

# Define sizes for validation and test sets
train_size = int(0.7 * len(datasets))  # 70% for training
val_size = int(0.15 * len(datasets))   # 15% for validation
test_size = len(datasets) - train_size - val_size  # Remaining 15% for testing


# Split the val_test dataset into validation and test sets
train_set, val_set, test_set = random_split(datasets, [train_size, val_size, test_size])

# SANITY CHECK
val_set = train_set = Subset(train_set, range(20))

# Apply custom transforms

class TransformedSubset(torch.utils.data.Dataset):
    def __init__(self, subset, transform):
        self.subset = subset
        self.transform = transform

    def __getitem__(self, index):
        image, label = self.subset[index]
        image = self.transform(image)
        return image, label

    def __len__(self):
        return len(self.subset)

datasets = {
    'train' : TransformedSubset(train_set, data_transforms['train']),
    'val' : TransformedSubset(val_set, data_transforms['val_test']),
    'test' : TransformedSubset(test_set, data_transforms['val_test'])
}


# Set batch_size
batch_size = 128

# Create DataLoaders
dataloaders = {
    x: DataLoader(datasets[x], batch_size=batch_size, shuffle=(x == 'train')) for x in ['train', 'val', 'test']
               } # NOTE: Consider setting num_workers (maybe write a script that sets num_workers device independently?)


# dataset sizes
dataset_sizes = {x: len(datasets[x]) for x in ['train', 'val', 'test']}

In [5]:
dataset_sizes

{'train': 20, 'val': 20, 'test': 3928}

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [7]:
from torch import nn
from torchvision import models


# Load ResNet50
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)

# Freeze earlier layers
for param in model.parameters():
    param.requires_grad = False

# Modify the final layer to match the number of classes in Animals10 (10)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(class_names))

model = model.to(device)

# Define loss function and optimizer (only optimizing the last layer)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.01)

In [8]:

len([param for param in model.fc.parameters()])

2

In [None]:
sum(p.numel() for p in model.parameters() if p.requires_grad)

20490

In [11]:
model.named_children

<bound method Module.named_children of ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(6

In [16]:
import time
import copy

def train_model(model, criterion, optimizer, num_epochs=10):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Start timing the epoch
        start_time = time.time()

    
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data
            for inputs, labels in dataloaders[phase]:
                inputs, labels = inputs.to(device), labels.to(device)
                print('.', end="")
                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward pass and optimize if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            print()


            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Deep copy the model if the accuracy improves
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        # End timing the epoch
        epoch_time = time.time() - start_time
        print(f'Epoch completed in {epoch_time // 60:.0f}m {epoch_time % 60:.0f}s')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

# Start overall training timer
overall_start_time = time.time()

# Train the model
model = train_model(model, criterion, optimizer, num_epochs=100)

# Calculate total time taken
total_time = time.time() - overall_start_time
print(f'Total training time: {total_time // 60:.0f}m {total_time % 60:.0f}s')

# Save the best model
torch.save(model.state_dict(), 'best_model1.pth')


Epoch 1/100
----------
.
train Loss: 2.2675 Acc: 0.1500
.
val Loss: 1.5519 Acc: 0.7000
Epoch completed in 0m 3s
Epoch 2/100
----------
.
train Loss: 1.1588 Acc: 0.6000
.
val Loss: 0.9766 Acc: 0.9500
Epoch completed in 0m 3s
Epoch 3/100
----------
.
train Loss: 0.6956 Acc: 0.8500
.
val Loss: 0.5971 Acc: 1.0000
Epoch completed in 0m 3s
Epoch 4/100
----------
.
train Loss: 0.4289 Acc: 1.0000
.
val Loss: 0.3689 Acc: 1.0000
Epoch completed in 0m 3s
Epoch 5/100
----------
.
train Loss: 0.2172 Acc: 1.0000
.
val Loss: 0.2300 Acc: 1.0000
Epoch completed in 0m 3s
Epoch 6/100
----------
.
train Loss: 0.2307 Acc: 0.9500
.
val Loss: 0.1429 Acc: 1.0000
Epoch completed in 0m 3s
Epoch 7/100
----------
.
train Loss: 0.0985 Acc: 1.0000
.
val Loss: 0.0884 Acc: 1.0000
Epoch completed in 0m 3s
Epoch 8/100
----------
.
train Loss: 0.0618 Acc: 1.0000
.
val Loss: 0.0595 Acc: 1.0000
Epoch completed in 0m 3s
Epoch 9/100
----------
.
train Loss: 0.0987 Acc: 0.9500
.
val Loss: 0.0401 Acc: 1.0000
Epoch completed i

In [61]:
def evaluate_model(model):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in dataloaders['test']:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f'Accuracy of the model on the validation images: {100 * correct / total:.2f}%')

# Evaluate the model
evaluate_model(model)


KeyboardInterrupt: 