In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/master_thesis/Resnet
!pip install carbontracker

In [None]:
import os
import sys
import json
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from tqdm import tqdm

from model import resnet34
from carbontracker.tracker import CarbonTracker
from sklearn.model_selection import StratifiedKFold

In [None]:
def evaluate_model_on_testset(model, device, test_path):
    # Assume the test data is structured in a similar way to the training data
    test_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    test_dataset = datasets.ImageFolder(root=test_path, transform=test_transform)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

    model.eval()
    correct = 0
    total = 0
    test_loss = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f'Accuracy on the test set: {accuracy:.2f}%')




def exponential_moving_average(data, alpha):
    """Calculate the exponential moving average using the specified alpha value."""
    return pd.Series(data).ewm(alpha=alpha, adjust=False).mean().values

def plot_and_save_training_results(train_loss_values, val_loss_values, train_accuracy_values, val_accuracy_values, save_path, filename):
    if len(train_loss_values) == 0 or len(val_loss_values) == 0:
        print("No data to plot.")
        return

    # Calculate smoothed data
    smoothed_train_loss = exponential_moving_average(train_loss_values, 0.5)
    smoothed_val_loss = exponential_moving_average(val_loss_values, 0.5)
    smoothed_train_accuracy = exponential_moving_average(train_accuracy_values, 0.5)
    smoothed_val_accuracy = exponential_moving_average(val_accuracy_values, 0.5)

    epochs_range = range(1, len(train_loss_values) + 1)
    plt.figure(figsize=(12, 6))

    # Plot training and validation loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, smoothed_train_loss, label='Training Loss', color='blue')
    plt.plot(epochs_range, smoothed_val_loss, label='Validation Loss', color='red')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    # Plot training and validation accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, smoothed_train_accuracy, label='Training Accuracy', color='blue')
    plt.plot(epochs_range, smoothed_val_accuracy, label='Validation Accuracy', color='red')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    plt.ylim([0, 1])

    plt.tight_layout()
    plt.savefig(os.path.join(save_path, filename))
    plt.show()


def count_parameters(model, trainable_only=True):
    """Calculate the total number of parameters in the model. If trainable_only is True, only count the trainable parameters."""
    if trainable_only:
        return sum(p.numel() for p in model.parameters() if p.requires_grad)
    else:
        return sum(p.numel() for p in model.parameters())


In [None]:
class NewFC(nn.Module):
    def __init__(self, in_features):
        super(NewFC, self).__init__()
        self.flatten = nn.Flatten()  # Flatten layer
        self.fc1 = nn.Linear(in_features, 512)  # Fully connected layer from in_features to 512 features
        self.relu = nn.ReLU()  # Activation function
        self.dropout = nn.Dropout(0.5)  # Dropout with a rate of 50%
        self.fc2 = nn.Linear(512, 2)  # Final layer mapping to 2 classes

    def forward(self, x):
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x


**Cross-validation of fine-tuning or freezing CNN layers**

In [None]:
def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("using {} device.".format(device))

    data_transform = {
        "train": transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    }

    image_path = "/content/drive/MyDrive/master_thesis/Datasets/Herlev"  # Update this path to your dataset location
    assert os.path.exists(image_path), f"{image_path} path does not exist."

    dataset = datasets.ImageFolder(root=image_path, transform=data_transform['train'])
    targets = [s[1] for s in dataset.samples]  # Extract class labels

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    fold_performance = []

    num_epochs = 200

    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(targets)), targets)):
        print(f"Training fold {fold+1}")
        train_sampler = torch.utils.data.SubsetRandomSampler(train_idx)
        val_sampler = torch.utils.data.SubsetRandomSampler(val_idx)

        train_loader = torch.utils.data.DataLoader(dataset, batch_size=16, sampler=train_sampler)
        validate_loader = torch.utils.data.DataLoader(dataset, batch_size=16, sampler=val_sampler)

        # net = resnet34(pretrained=True)  # Make sure to adapt this if your model differs
        net = resnet34()
        # Transfer learning part
        model_weight_path = "./resnet34-pre.pth"
        assert os.path.exists(model_weight_path), "file {} does not exist.".format(model_weight_path)
        net.load_state_dict(torch.load(model_weight_path, map_location='cpu'))

        # Freezing Layer1
        for name, param in net.named_parameters():
            if "layer1" in name:
              param.requires_grad = False

        # Freezing Layer1 and Layer2
        # for name, param in net.named_parameters():
        #     if "layer1" in name or "layer2" in name:
        #         param.requires_grad = False

        # Freezing Layer1, Layer2, and Layer3
        # for name, param in net.named_parameters():
        #     if "layer1" in name or "layer2" in name or "layer3" in name:
        #         param.requires_grad = False

        # Freezing Layer1, Layer2, Layer3, and Layer4
        # for name, param in net.named_parameters():
        #     if "layer1" in name or "layer2" in name or "layer3" in name or "layer4" in name:
        #         param.requires_grad = False

        # Freezing the entire CNN
        # for param in net.parameters():
        #     param.requires_grad = False

        ######
        # Structure A: Basic ResNet34 with modified output layer (choose one structure at a time)
        in_channel = net.fc.in_features
        net.fc = nn.Linear(in_channel, 2)  # Output layer changed to classify 2 classes
        net.to(device)

        ######
        # Structure B: Add a new fully connected layer without replacing the existing one
        # Uncomment the following block if you want to test Structure B:
        # in_features = net.fc.in_features
        # net.fc = nn.Sequential(
        #     nn.Linear(in_features, 1000),  # Original fully connected layer
        #     nn.ReLU(),
        #     nn.Linear(1000, 2)  # New layer from 1000 classes to 2 classes
        # )
        # net.to(device)

        ######
        # Structure C: Replace the original fully connected layer with a new one
        # Uncomment the following block if you want to test Structure C:
        # in_features = net.fc.in_features
        # net.fc = NewFC(in_features)  # Replace with custom fully connected layer
        # net.to(device)

        total_params = count_parameters(net, trainable_only=False)
        trainable_params = count_parameters(net, trainable_only=True)
        print(f"Total parameters: {total_params}")
        print(f"Trainable parameters: {trainable_params}")

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam([p for p in net.parameters() if p.requires_grad], lr=0.0001)

        # Train and Validate
        train_acc, val_acc = train_validate(net, train_loader, validate_loader, criterion, optimizer, device, num_epochs)
        fold_performance.append((train_acc, val_acc))

    # Calculate mean and standard deviation for accuracies
    train_accs = [fp[0] for fp in fold_performance]
    val_accs = [fp[1] for fp in fold_performance]
    train_mean = np.mean(train_accs)
    train_std = np.std(train_accs)
    val_mean = np.mean(val_accs)
    val_std = np.std(val_accs)

    print(f"Train Accuracy: Mean = {train_mean:.4f}, Std Dev = {train_std:.4f}")
    print(f"Validation Accuracy: Mean = {val_mean:.4f}, Std Dev = {val_std:.4f}")

def train_validate(net, train_loader, validate_loader, criterion, optimizer, device, num_epochs=10):
    net.train()
    for epoch in range(num_epochs):  # Add epoch loop
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct / total
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}')

    # Validate after all training epochs
    net.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in validate_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            val_loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(validate_loader)
    val_acc = correct / total
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}')

    return epoch_acc, val_acc


if __name__ == '__main__':
    main()


**Cross-validation of progressive learning methods**

In [None]:
import os
import json
import numpy as np
import sys
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from tqdm import tqdm
from sklearn.model_selection import StratifiedKFold

from model import resnet34  # Ensure you have a resnet34 or change to the model you are using

def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("using {} device.".format(device))

    data_transform = {
        "train": transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    }

    image_path = "/content/drive/MyDrive/master_thesis/Datasets/Herlev"
    assert os.path.exists(image_path), f"{image_path} path does not exist."

    dataset = datasets.ImageFolder(root=image_path, transform=data_transform['train'])
    targets = [s[1] for s in dataset.samples]  # extract class labels

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    fold_performance = []

    num_epochs_stage1 = 150
    num_epochs_stage2 = 50

    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(targets)), targets)):
        print(f"Training fold {fold+1}")
        train_sampler = torch.utils.data.SubsetRandomSampler(train_idx)
        val_sampler = torch.utils.data.SubsetRandomSampler(val_idx)

        train_loader = torch.utils.data.DataLoader(dataset, batch_size=16, sampler=train_sampler)
        validate_loader = torch.utils.data.DataLoader(dataset, batch_size=16, sampler=val_sampler)

        # Initialize the model
        net = resnet34()
        model_weight_path = "./resnet34-pre.pth"
        assert os.path.exists(model_weight_path), "file {} does not exist.".format(model_weight_path)
        net.load_state_dict(torch.load(model_weight_path, map_location='cpu', weights_only=True))

        # Stage 1: Freeze layer1 and layer2
        #for name, param in net.named_parameters():
        #    if "layer1" in name:
        #        param.requires_grad = False

        for name, param in net.named_parameters():
            if "layer1" in name or "layer2" in name:
                param.requires_grad = False

        # Replace the fully connected layer
        in_features = net.fc.in_features
        net.fc = nn.Sequential(
            nn.Linear(in_features, 1000),
            nn.ReLU(),
            nn.Linear(1000, 2)
        )
        net.to(device)
        #in_features = net.fc.in_features
        #net.fc = NewFC(in_features)
        #net.to(device)

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam([p for p in net.parameters() if p.requires_grad], lr=0.0001)



        # Train Stage 1
        print(f"Training Stage 1 for fold {fold+1}")
        train_model(net, train_loader, criterion, optimizer, device, num_epochs_stage1)

        # Stage 2: Unfreeze layer1 and layer2 for fine-tuning
        #for name, param in net.named_parameters():
        #    if "layer1" in name:
        #        param.requires_grad = True

        for name, param in net.named_parameters():
            if "layer1" in name or "layer2" in name:
               param.requires_grad = True

        # Fine-tuning with a smaller learning rate
        optimizer = optim.Adam(net.parameters(), lr=1e-5)


        # Train Stage 2
        print(f"Fine-tuning Stage 2 for fold {fold+1}")
        train_model(net, train_loader, criterion, optimizer, device, num_epochs_stage2)

        # Perform validation after both training stages
        print(f"Validating after 200 epochs for fold {fold+1}")
        val_acc = validate_model(net, validate_loader, criterion, device)
        fold_performance.append((None, val_acc))  # No separate train accuracy, only val accuracy after full 200 epochs

    # Calculate mean and standard deviation for accuracies
    val_accs = [fp[1] for fp in fold_performance]
    val_mean = np.mean(val_accs)
    val_std = np.std(val_accs)

    print(f"Validation Accuracy: Mean = {val_mean:.4f}, Std Dev = {val_std:.4f}")

def train_model(net, train_loader, criterion, optimizer, device, num_epochs):
    net.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct / total
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}')

def validate_model(net, validate_loader, criterion, device):
    net.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in validate_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            val_loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(validate_loader)
    val_acc = correct / total
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}')

    return val_acc

if __name__ == '__main__':
    main()
