# Made by

Lucas Kristiansson
980320-5971

Rikard Radovac
010826-8376

Carolina Rönnewall
980322-7900

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip drive/MyDrive/data/a5_data.zip -d data

In [None]:
import torch.nn as nn
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import torchvision
import torch
from torchvision.transforms import v2
import torch.nn.functional as F


def load_vggmodel(device):
    weights_id = torchvision.models.VGG16_Weights.IMAGENET1K_V1
    vggmodel = torchvision.models.vgg16(weights=weights_id)
    vggmodel.eval()
    vggtransforms = weights_id.transforms()

    # freeze the parameters
    for param in vggmodel.parameters():
        param.requires_grad = False



    vggmodel.to(device)
    return vggmodel, vggtransforms

def load_resnetmodel(device):
    weights_id = torchvision.models.ResNet50_Weights.IMAGENET1K_V1
    resnetmodel = torchvision.models.resnet50(weights=weights_id)
    resnetmodel.eval()
    resnettransforms = weights_id.transforms()

    # freeze the parameters
    for param in resnetmodel.parameters():
        param.requires_grad = False

    # in_features = resnetmodel.fc.in_features
    # n_classes = 1

    # new_outer_layers = nn.Sequential(
    #     nn.Linear(in_features=in_features, out_features=n_classes)
    #     )

    # resnetmodel.fc = new_outer_layers

    resnetmodel.to(device)
    return resnetmodel, resnettransforms


In [None]:
# Our CNN baseline
class CNNBaseline(nn.Module):
    def __init__(self, num_classes=1):  # Assuming 1 class by default
        super(CNNBaseline, self).__init__()

        # Convolutional layer block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # Reduces image size to half

        # Convolutional layer block 2
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)

        # Convolutional layer block 3
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)

        # Assuming three pooling layers that halve the image size each time, the image size is reduced to 128/2/2/2 = 16
        # Fully connected layer
        self.fc1 = nn.Linear(128 * 16 * 16, 512)  # Adjusted for 128x128 input images
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Block 1
        x = self.pool(F.relu(self.conv1(x)))

        # Block 2
        x = self.pool(F.relu(self.conv2(x)))

        # Block 3
        x = self.pool(F.relu(self.conv3(x)))

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)  # Flatten the output

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

# Our CNN with batch normalization
class CNNBatchNorm(nn.Module):
    def __init__(self, num_classes=1):  # Assuming 1 class by default
        super(CNNBatchNorm, self).__init__()

        # Convolutional layer block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # Reduces image size to half

        # Convolutional layer block 2
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)

        # Convolutional layer block 3
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)

        # Assuming three pooling layers that halve the image size each time, the image size is reduced to 128/2/2/2 = 16
        # Fully connected layer
        self.fc1 = nn.Linear(128 * 16 * 16, 512)  # Adjusted for 128x128 input images
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Block 1
        x = self.pool(F.relu(self.bn1(self.conv1(x))))

        # Block 2
        x = self.pool(F.relu(self.bn2(self.conv2(x))))

        # Block 3
        x = self.pool(F.relu(self.bn3(self.conv3(x))))

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)  # Flatten the output

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

# Our CNN with layer normalization
class CNNLayerNorm(nn.Module):
    def __init__(self, num_classes=1):  # Assuming 1 class by default
        super(CNNLayerNorm, self).__init__()

        # Convolutional layer block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        # Use LayerNorm
        self.ln1 = nn.LayerNorm([32, 128, 128])  # Adjusted for the output size of this layer
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # Reduces image size to half

        # Convolutional layer block 2
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        # Use LayerNorm
        self.ln2 = nn.LayerNorm([64, 64, 64])  # Assuming the output size after pooling

        # Convolutional layer block 3
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        # Use LayerNorm
        self.ln3 = nn.LayerNorm([128, 32, 32])  # Assuming the output size after pooling

        # Fully connected layer
        self.fc1 = nn.Linear(128 * 16 * 16, 512)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Block 1
        x = F.relu(self.ln1(self.conv1(x)))
        x = self.pool(x)

        # Block 2
        x = F.relu(self.ln2(self.conv2(x)))
        x = self.pool(x)

        # Block 3
        x = F.relu(self.ln3(self.conv3(x)))
        x = self.pool(x)

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

# Our CNN with group normalization
class CNNGroupNorm(nn.Module):
    def __init__(self, num_classes=1, num_groups=8):  # Assuming 1 class by default and 8 groups for GN
        super(CNNGroupNorm, self).__init__()

        # Convolutional layer block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        # Use GroupNorm
        self.gn1 = nn.GroupNorm(num_groups=num_groups, num_channels=32)

        # Convolutional layer block 2
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        # Use GroupNorm
        self.gn2 = nn.GroupNorm(num_groups=num_groups, num_channels=64)

        # Convolutional layer block 3
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        # Use GroupNorm
        self.gn3 = nn.GroupNorm(num_groups=num_groups, num_channels=128)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # Reduces image size to half

        # Fully connected layer
        self.fc1 = nn.Linear(128 * 16 * 16, 512)  # Adjusted for 128x128 input images
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Block 1
        x = F.relu(self.gn1(self.conv1(x)))
        x = self.pool(x)

        # Block 2
        x = F.relu(self.gn2(self.conv2(x)))
        x = self.pool(x)

        # Block 3
        x = F.relu(self.gn3(self.conv3(x)))
        x = self.pool(x)

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

# Our CNN baseline without pool, will be used to compare with/without residual
class CNNWithoutPool(nn.Module):
    def __init__(self, num_classes=1):  # Assuming 1 class by default
        super(CNNWithoutPool, self).__init__()

        # Convolutional layer block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3, padding=1)

        # Convolutional layer block 2
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, padding=1)

        # Convolutional layer block 3
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=3, kernel_size=3, padding=1)

        # Assuming three pooling layers that halve the image size each time, the image size is reduced to 128/2/2/2 = 16
        # Fully connected layer
        self.fc1 = nn.Linear(3*128*128, 512)  # Adjusted for 128x128 input images
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Block 1
        x = F.relu(self.conv1(x))

        # Block 2
        x = F.relu(self.conv2(x))

        # Block 3
        x = F.relu(self.conv3(x))

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)  # Flatten the output

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

# Our CNN with Residual
class CNNResidual(nn.Module):
    def __init__(self, num_classes=1):
        super(CNNResidual, self).__init__()

        # Convolutional layer block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3, padding=1)

        # Convolutional layer block 2
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, padding=1)

        # Convolutional layer block 3
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=3, kernel_size=3, padding=1)

        # Assuming three pooling layers that halve the image size each time, the image size is reduced to 128/2/2/2 = 16
        # Fully connected layer
        self.fc1 = nn.Linear(3 * 128 * 128, 512)  # Adjusted for 128x128 input images
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        residual = x

        # Block 1
        x = F.relu(self.conv1(x))

        # Block 2
        x = F.relu(self.conv2(x))

        # Block 3
        x = self.conv3(x)
        x += residual
        x = F.relu(x)

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)  # Flatten the output

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

In [None]:
folder_path = "data/a5_data/"


# Without augmentations
transform_function = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
])

folder = ImageFolder(folder_path + "train", transform=transform_function)
loader = DataLoader(folder, batch_size=16, shuffle=True)

folder_test = ImageFolder(folder_path + "val", transform=transform_function)
loader_test = DataLoader(folder_test, batch_size=16, shuffle=True)


In [None]:

@torch.no_grad()
def evaluate_model(model, test_data, device, batch_size, type_set: str = "val"):
    model.eval()
    correct = 0

    for x, y in test_data:
        x = x.to(device)
        y = y.to(device)
        outputs = model(x).sigmoid()

        predicted = torch.round(outputs)


        correct += sum(predicted.squeeze() == y)

    accuracy = correct / (len(test_data) * batch_size)
    print(f"Accuracy on {type_set}: ", accuracy.item())


def train_model(model, dataloader, loader_test, device, epochs: int = 5):
    model.to(device)
    model.train()
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.BCEWithLogitsLoss()

    for epoch in range(epochs):
        loss_sum = 0
        for x, y in dataloader:
            x = x.to(device)
            y = y.to(device)

            output = model(x)


            loss = criterion(output.squeeze(), y.to(torch.float32))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_sum += loss.item()

        batch_size = 16
        evaluate_model(model, dataloader, device, batch_size=batch_size, type_set="train")
        evaluate_model(model, loader_test, device, batch_size=batch_size)
        print(f"Epoch {epoch + 1} with mean loss: {loss_sum / len(dataloader)}")
        print()


device = "cuda" if torch.cuda.is_available() else "cpu"
models = [CNNBaseline, CNNBatchNorm, CNNLayerNorm, CNNGroupNorm, CNNWithoutPool, CNNResidual]
for model_class in models:
    model = model_class(num_classes=1)
    model.to(device)
    print(f"Training model {model_class}")
    train_model(model, loader, loader_test, device)

In [None]:
 # # With augmentation
transform_function = v2.Compose([
    v2.ToImage(),
    v2.RandomResizedCrop(size=(128, 128), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



folder = ImageFolder(folder_path + "train", transform=transform_function)
loader = DataLoader(folder, batch_size=16, shuffle=True)

folder_test = ImageFolder(folder_path + "val", transform=transform_function)
loader_test = DataLoader(folder_test, batch_size=16, shuffle=True)



model = CNNBaseline(num_classes=1)
train_model(model, loader, loader_test, device)

In [None]:
def compute_and_save_features(model, train_folder, eval_folder, transform_function, folder_path, device):

    model.eval()
    folder = ImageFolder(folder_path + "train", transform=transform_function)
    loader = DataLoader(train_folder, batch_size=1, shuffle=False)

    folder_test = ImageFolder(folder_path + "val", transform=transform_function)
    loader_test = DataLoader(eval_folder, batch_size=1, shuffle=False)

    all_features = []
    with torch.no_grad():
        for dataloader in [loader, loader_test]:
            current_features = []
            current_labels = []
            for x, y in dataloader:
                x = x.to(device)
                features = model(x)
                current_features.append(features)
                current_labels.append(y)
            all_features.append((current_features, current_labels))
    return all_features[0], all_features[1]


device = "cuda" if torch.cuda.is_available() else "cpu"
model, transform_function = load_resnetmodel(device)
# all layers except classification layer
model = nn.Sequential(*list(model.children())[:-1])

train, test = compute_and_save_features(model, folder, folder_test, transform_function, folder_path, device)
torch.save(torch.concat(train[0]), "resnet.pkl")
torch.save(torch.concat(train[1]), "resnet_labels.pkl")

torch.save(torch.concat(test[0]), "resnet_test.pkl")
torch.save(torch.concat(test[1]), "resnet_labels_test.pkl")


model, transform_function = load_vggmodel(device)
# all layers except classification layer
model = nn.Sequential(*list(model.children())[:-1])
train, test = compute_and_save_features(model, folder, folder_test, transform_function, folder_path, device)
torch.save(torch.concat(train[0]), "vggmodel.pkl")
torch.save(torch.concat(train[1]), "vggmodel_labels.pkl")

torch.save(torch.concat(test[0]), "vggmodel_test.pkl")
torch.save(torch.concat(test[1]), "vggmodel_labels_test.pkl")

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|██████████| 528M/528M [00:04<00:00, 136MB/s] 


**Create a classifier head that uses the pre-trained features as inputs**

In [None]:

def get_model_input_size(model, model_type: str):

    if model_type == "vgg":
        return model.classifier[0].in_features
    elif model_type == "resnet50":
        return model.fc.in_features


device = "cuda" if torch.cuda.is_available() else "cpu"
n_classes = 1

model, transform_function = load_vggmodel(device)
in_features = get_model_input_size(model, "vgg")
vgg_classifier = nn.Sequential(
   nn.Linear(in_features, 2048),
    nn.BatchNorm1d(num_features=2048),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),

    nn.Linear(2048, n_classes)

)

model, transform_function = load_resnetmodel(device)

in_features = get_model_input_size(model, "resnet50")
resnet_classifier = nn.Sequential(
    nn.Linear(in_features, 2048),
    nn.BatchNorm1d(num_features=2048),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),

    nn.Linear(2048, n_classes)

)

del model
del transform_function



Train pre-trained models

In [None]:
def train_pretrained_model(model, dataloader, loader_test, device, epochs: int = 5):
    model.to(device)
    model.train()
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.BCEWithLogitsLoss()

    for epoch in range(epochs):
        loss_sum = 0
        for x, y in dataloader:
            x = x.view(x.size()[0], -1)
            x = x.to(device)
            y = y.to(device)

            output = model(x)


            loss = criterion(output.squeeze(), y.to(torch.float32))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_sum += loss.item()

        batch_size = 16
        evaluate_pretrained_model(model, loader_test, device, batch_size=batch_size)
        evaluate_pretrained_model(model, dataloader, device, batch_size=batch_size, type_set="train")
        model.train()
        print(f"Epoch {epoch + 1} with mean loss: {loss_sum / len(dataloader)}")



@torch.no_grad()
def evaluate_pretrained_model(model, test_data, device, batch_size, type_set: str = "val"):
    model.eval()
    correct = 0

    for x, y in test_data:
        x = x.view(x.size()[0], -1)
        x = x.to(device)
        y = y.to(device)
        outputs = model(x).sigmoid()

        predicted = torch.round(outputs)


        correct += sum(predicted.squeeze() == y)

    accuracy = correct / (len(test_data) * batch_size)
    print(f"Accuracy on {type_set}: ", accuracy.item())

In [None]:

train_dataset = torch.utils.data.TensorDataset(torch.load("vggmodel.pkl"), torch.load("vggmodel_labels.pkl"))
test_dataset = torch.utils.data.TensorDataset(torch.load("vggmodel_test.pkl"), torch.load("vggmodel_labels_test.pkl"))
train = DataLoader(train_dataset, batch_size=16, shuffle=True)
test = DataLoader(test_dataset, batch_size=16)


train_pretrained_model(vgg_classifier, train, test, device)




train_dataset = torch.utils.data.TensorDataset(torch.load("resnet.pkl"), torch.load("resnet_labels.pkl"))
test_dataset = torch.utils.data.TensorDataset(torch.load("resnet_test.pkl"), torch.load("resnet_labels_test.pkl"))
train = DataLoader(train_dataset, batch_size=16, shuffle=True)
test = DataLoader(test_dataset, batch_size=16)


train_pretrained_model(resnet_classifier, train, test, device)

Accuracy on val:  0.8132911324501038
Accuracy on train:  0.8770211338996887
Epoch 1 with mean loss: 0.4029626365917832
Accuracy on val:  0.8156645894050598
Accuracy on train:  0.8990982174873352
Epoch 2 with mean loss: 0.33343674983847793
Accuracy on val:  0.8291139602661133
Accuracy on train:  0.8936566710472107
Epoch 3 with mean loss: 0.3062849574865986
Accuracy on val:  0.8291139602661133
Accuracy on train:  0.9357897639274597
Epoch 4 with mean loss: 0.27203851192262934
Accuracy on val:  0.8346519470214844
Accuracy on train:  0.9404539465904236
Epoch 5 with mean loss: 0.24828576045435163
Accuracy on val:  0.8101266026496887
Accuracy on train:  0.829135537147522
Epoch 1 with mean loss: 0.4516307460653841
Accuracy on val:  0.8283228278160095
Accuracy on train:  0.8700248599052429
Epoch 2 with mean loss: 0.3729506469614322
Accuracy on val:  0.8409810066223145
Accuracy on train:  0.8813743591308594
Epoch 3 with mean loss: 0.3575314890908365
Accuracy on val:  0.814873456954956
Accuracy o

In [None]:
def compute_and_save_features(model, test_folder_path, transform_function, device):

    model.eval()
    folder = ImageFolder(test_folder_path, transform=transform_function)
    loader = DataLoader(folder, batch_size=1, shuffle=False)


    with torch.no_grad():
        current_features = []
        current_labels = []
        for x, y in loader:
            x = x.to(device)
            features = model(x)
            current_features.append(features)
            current_labels.append(y)


    return current_features, current_labels



device = "cuda" if torch.cuda.is_available() else "cpu"
model, transform_function = load_resnetmodel(device)
# all layers except classification layer
model = nn.Sequential(*list(model.children())[:-1])

blind_test, blind_labels = compute_and_save_features(model, "/content/data/a5_data/test_blind", transform_function, device)
torch.save(torch.concat(blind_test), "resnet_blind_test.pkl")
torch.save(torch.concat(blind_labels), "resnet_blind_test_labels.pkl")



In [None]:
@torch.no_grad()
def evaluate_pretrained_model(model, test_data, device, batch_size, type_set: str = "val"):
    model.eval()
    correct = 0

    all_predictions = []
    for x, y in test_data:
        x = x.view(x.size()[0], -1)
        x = x.to(device)
        y = y.to(device)
        outputs = model(x).sigmoid()

        predicted = torch.round(outputs).cpu()
        all_predictions.extend(predicted)
    return all_predictions


test_dataset = torch.utils.data.TensorDataset(torch.load("resnet_blind_test.pkl"), torch.load("resnet_blind_test_labels.pkl"))

test = DataLoader(test_dataset, batch_size=16)

predictions = evaluate_pretrained_model(resnet_classifier, test, device, batch_size=16)

In [None]:
map = {0: "MEL", 1: "NV"}

labeled_predictions = [map[int(pred)] for pred in predictions]

In [None]:
with open("test.txt", "w") as f:
    for label in labeled_predictions:
        f.write(label + "\n")