<h3>Imports</h3>

In [15]:
from PIL import Image
from collections import Counter
import datetime, os
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import ConcatDataset, Dataset, DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
import torchvision.models as models
from torchmetrics.classification import MulticlassAccuracy, MulticlassPrecision, MulticlassRecall, MulticlassF1Score, MulticlassConfusionMatrix

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device:", DEVICE)

DataLoader = torch.utils.data.DataLoader

SEED = 1
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

Device: cpu


<h3>Set-up classes and mappings</h3>

In [16]:
classes = ["ACK", "BCC", "MEL", "NEV", "SCC", "SEK"]
class_to_idx = {cls_name: idx for idx, cls_name in enumerate(classes)}
idx_to_class = {idx: cls_name for cls_name, idx in class_to_idx.items()}

<h3>Pre-processing: cropping and lowering resolution, per paper.</h3>

In [17]:
def crop_center(img, crop_ratio):
    width, height = img.size
    new_size = int(crop_ratio * min(width, height))
    left = (width - new_size) // 2
    top = (height - new_size) // 2
    right = left + new_size
    bottom = top + new_size
    return img.crop((left, top, right, bottom))


transform = transforms.Compose([
    transforms.Lambda(lambda img: crop_center(img, 0.8)),
    transforms.Resize((224, 224)), # 224x224 is a common choice for RESNET-18, I'm told..  
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], 
                         [0.229, 0.224, 0.225]),
])

transform_flip = transforms.Compose([
    transform,
    transforms.RandomHorizontalFlip(.5), # .5 is used in the Pacheco code 
    transforms.RandomVerticalFlip(.2), # .2 is used in the Pacheco code 
])

<h3>Dataset Class</h3>

In [18]:
class PAD_UFES_Dataset(Dataset):
    def __init__(self, img_dir, label_dict, transform=None):
        self.img_dir = img_dir
        self.label_dict = label_dict
        self.transform = transform
        self.image_files = [f for f in os.listdir(img_dir) 
                            if f.endswith('.png') and f in label_dict]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        label_name = self.label_dict[img_name]
        label = class_to_idx[label_name]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

<h3>Establish Paths</h3>

In [19]:
# data_path = os.path.expanduser('~/Desktop/230PRJ/PAD-UFES-20/')
data_path = os.path.expanduser('~/Desktop/CS230/Project/PAD-UFES-20/')
metadata_path = os.path.join(data_path, 'metadata.csv')
images_path = os.path.join(data_path, 'images')

<h3>Load Labels, Initialize and Split dataset</h3>

In [20]:
metadata = pd.read_csv(metadata_path)

label_dict = dict(zip(metadata['img_id'], metadata['diagnostic']))
label_dict = {f"{key}": value for key, value in label_dict.items()}

dataset = PAD_UFES_Dataset(img_dir=images_path, label_dict=label_dict, transform=transform)

dataset_size = len(dataset)
train_size = int(0.8 * dataset_size)
val_size = int(0.1 * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_dataset.transform = transform_flip # apply flip transforms just to train set

# Sanity check
assert train_size + test_size + val_size == dataset_size == 2298

<h3>Get class weights for later: potentially weighted softmax, or re-sampling, etc.</h3>

In [21]:
train_labels = [label for _, label in train_dataset]
label_counts = Counter(train_labels)
total_samples = sum(label_counts.values())
class_weights = [total_samples / label_counts[i] for i in range(len(classes))]
class_weights = torch.FloatTensor(class_weights).to(DEVICE)

KeyboardInterrupt: 

<h3>Set-up Data Loaders</h3>

In [None]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

<h3>Display some images</h3>

In [None]:
def imshow(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.axis('off')
    plt.show()

images, _ = next(iter(train_loader))
imshow(torchvision.utils.make_grid(images))

<h1>Models</h1>

<h3>ResNet-18</h3>

In [None]:
assert len(classes) == 6

model_resnet18 = models.resnet18(pretrained=True)
num_ftrs = model_resnet18.fc.in_features
model_resnet18.fc = nn.Linear(num_ftrs, len(classes))

# Unfreeze just the last CONV layer and last FC layer, and tweak it for 6 outputs
for param in model_resnet18.parameters():
    param.requires_grad = False
for param in model_resnet18.layer4.parameters():
    param.requires_grad = True
for param in model_resnet18.fc.parameters():
    param.requires_grad = True

model_resnet18 = model_resnet18.to(DEVICE)

In [None]:
criterion_resnet18 = nn.CrossEntropyLoss(weight=class_weights) # Weighted CEL
optimizer_resnet18 = optim.SGD(model_resnet18.parameters(), lr=0.01, momentum=0.9) # Keeping this the same as the paper.
scheduler_resnet18 = optim.lr_scheduler.StepLR(optimizer_resnet18, step_size=7, gamma=0.1) # LR decay.

<h3>ResNet-50</h3>

In [None]:
assert len(classes) == 6

model_resnet50 = models.resnet50(pretrained=True)
num_ftrs = model_resnet50.fc.in_features
model_resnet50.fc = nn.Linear(num_ftrs, len(classes))

# Unfreeze just the last CONV layer and last FC layer, and tweak it for 6 outputs
for param in model_resnet50.parameters():
    param.requires_grad = False
for param in model_resnet50.layer4.parameters():
    param.requires_grad = True
for param in model_resnet50.fc.parameters():
    param.requires_grad = True

model_resnet50 = model_resnet50.to(DEVICE)

In [None]:
criterion_resnet50 = nn.CrossEntropyLoss(weight=class_weights)
optimizer_resnet50 = optim.SGD(model_resnet50.parameters(), lr=0.01, momentum=0.9, weight_decay=0.001)
scheduler_resnet50 = optim.lr_scheduler.StepLR(optimizer_resnet50, step_size=7, gamma=0.1)

<h3>EfficientNet B4</h3>

In [None]:
assert len(classes) == 6

model_efficientnet_b4 = models.efficientnet_b4(pretrained=True)

for param in model_efficientnet_b4.parameters():
    param.requires_grad = False
for param in model_efficientnet_b4.features[-1].parameters():
    param.requires_grad = True
for param in model_efficientnet_b4.classifier.parameters():
    param.requires_grad = True

model_efficientnet_b4 = model_efficientnet_b4.to(DEVICE)

model_efficientnet_b4.classifier = nn.Sequential(
    nn.Dropout(p=0.4, inplace=True),
    nn.Linear(model_efficientnet_b4.classifier[1].in_features, len(classes))
)

In [None]:
criterion_efficientnet_b4 = nn.CrossEntropyLoss(weight=class_weights)
optimizer_efficientnet_b4 = optim.SGD(model_efficientnet_b4.parameters(), lr=0.01, momentum=0.9)
scheduler_efficientnet_b4 = optim.lr_scheduler.StepLR(optimizer_efficientnet_b4, step_size=7, gamma=0.1)

<h3>Custom (Simple) CNN Model</h3>

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        # 3 CONV
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        # 1 MAXPOOL
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        # 2 FC
        self.fc1 = nn.Linear(128 * 28 * 28, 256)
        self.fc2 = nn.Linear(256, len(classes))
        # 1 DO
        self.dropout = nn.Dropout(0.5)
    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # OP Dims: [batch_size, 32, 112, 112]
        x = self.pool(F.relu(self.conv2(x)))  # OP Dims: [batch_size, 64, 56, 56]
        x = self.pool(F.relu(self.conv3(x)))  # OP Dims: [batch_size, 128, 28, 28]
        x = x.view(-1, 128 * 28 * 28)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
simpleCNN = SimpleCNN(num_classes=len(classes))
assert len(classes) == 6, "incorrect number of classes"
simpleCNN = simpleCNN.to(DEVICE)

criterion_simpleCNN = nn.CrossEntropyLoss(weight=class_weights)
optimizer_simpleCNN = optim.Adam(simpleCNN.parameters(), lr=0.001)

<h3>ResNet-18 with Dropout</h3>

In [None]:
assert len(classes) == 6

model_resnet18WithDropout = models.resnet18(pretrained=True)
num_ftrs = model_resnet18WithDropout.fc.in_features
model_resnet18WithDropout.fc = nn.Sequential(nn.Flatten(),
                                             nn.Dropout(0.5),
                                             nn.Linear(num_ftrs, 256),
                                             nn.ReLU(),
                                             nn.Dropout(0.5),
                                             nn.Linear(256, len(classes)))

for param in model_resnet18WithDropout.parameters():
    param.requires_grad = False
for param in model_resnet18WithDropout.layer4.parameters():
    param.requires_grad = True
for param in model_resnet18WithDropout.fc.parameters():
    param.requires_grad = True

model_resnet18WithDropout = model_resnet18WithDropout.to(DEVICE)

In [None]:
criterion_resnet18WithDropout = nn.CrossEntropyLoss(weight=class_weights)
optimizer_resnet18WithDropout = optim.Adam(model_resnet18WithDropout.parameters(), lr=0.01, weight_decay=0.001) # use Adam and weight decay
scheduler_resnet18WithDropout = optim.lr_scheduler.StepLR(optimizer_resnet18WithDropout, step_size=7, gamma=0.1)

<h3>ResNet-50 with Dropout</h3>

In [None]:
assert len(classes) == 6

model_resnet50WithDropout = models.resnet50(pretrained=True)
num_ftrs = model_resnet50WithDropout.fc.in_features
model_resnet50WithDropout.fc = nn.Sequential(nn.Flatten(),
                                             nn.Dropout(0.5),
                                             nn.Linear(num_ftrs, 256),
                                             nn.ReLU(),
                                             nn.Dropout(0.5),
                                             nn.Linear(256, len(classes)))

for param in model_resnet50WithDropout.parameters():
    param.requires_grad = False
for param in model_resnet50WithDropout.layer4.parameters():
    param.requires_grad = True
for param in model_resnet50WithDropout.fc.parameters():
    param.requires_grad = True

model_resnet50WithDropout = model_resnet50WithDropout.to(DEVICE)

In [None]:
criterion_resnet50WithDropout = nn.CrossEntropyLoss(weight=class_weights)
optimizer_resnet50WithDropout = optim.Adam(model_resnet50WithDropout.parameters(), lr=0.01, weight_decay=0.001) # use Adam and weight decay
scheduler_resnet50WithDropout = optim.lr_scheduler.StepLR(optimizer_resnet50WithDropout, step_size=7, gamma=0.1)

<h3>Helper Functions</h3>

In [None]:
def train(log_name, model, train_loader, optimizer, criterion, scheduler, num_epochs):
    writer = SummaryWriter('runs/' + log_name + '_' + str(datetime.datetime.now()).replace(" ", "_"))
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0
        
        for inputs, labels in train_loader:
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)
            
            optimizer.zero_grad()
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            loss.backward()
            optimizer.step()
            
            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        if scheduler is not None:
            scheduler.step()
        
        epoch_loss = running_loss / train_size
        epoch_acc = running_corrects.double() / train_size
        
        # Validation
        model.eval()
        val_running_loss = 0.0
        val_running_corrects = 0
        
        with torch.no_grad():
            for val_inputs, val_labels in val_loader:
                val_inputs = val_inputs.to(DEVICE)
                val_labels = val_labels.to(DEVICE)
                
                val_outputs = model(val_inputs)
                val_loss = criterion(val_outputs, val_labels)
                
                _, val_preds = torch.max(val_outputs, 1)
                val_running_loss += val_loss.item() * val_inputs.size(0)
                val_running_corrects += torch.sum(val_preds == val_labels.data)
        
        val_loss = val_running_loss / val_size
        val_acc = val_running_corrects.double() / val_size
        
        print(f'Epoch {epoch+1}/{num_epochs}')
        print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
        print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}')
        writer.add_scalar('Loss/train', epoch_loss, epoch+1)
        writer.add_scalar('Loss/validation', val_loss, epoch+1)
        writer.add_scalar('Accuracy/train', epoch_acc, epoch+1)
        writer.add_scalar('Accuracy/validation', val_acc, epoch+1)

    writer.close()

In [None]:
def test(log_name, model, test_loader):
    writer = SummaryWriter('runs/' + 'test/' + log_name + '_' + str(datetime.datetime.now()).replace(" ", "_"))
    
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for test_inputs, test_labels in test_loader:
            test_inputs = test_inputs.to(DEVICE)
            test_labels = test_labels.to(DEVICE)
            
            test_outputs = model(test_inputs)
            _, test_preds = torch.max(test_outputs, 1)
            all_preds.append(test_preds)
            all_labels.append(test_labels)
    
    all_preds = torch.cat(all_preds)
    all_labels = torch.cat(all_labels)

    num_classes = len(classes)
    accuracy_metric = MulticlassAccuracy(num_classes=num_classes, average='macro')
    precision_metric = MulticlassPrecision(num_classes=num_classes, average='macro')
    recall_metric = MulticlassRecall(num_classes=num_classes, average='macro')
    f1_metric = MulticlassF1Score(num_classes=num_classes, average='macro')
    confusion_matrix_metric = MulticlassConfusionMatrix(num_classes=num_classes)

    accuracy = accuracy_metric(all_preds, all_labels)
    precision = precision_metric(all_preds, all_labels)
    recall = recall_metric(all_preds, all_labels)
    f1_score = f1_metric(all_preds, all_labels)
    confusion_matrix = confusion_matrix_metric(all_preds, all_labels)
    
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1_score:.4f}")

    writer.add_scalar('Accuracy/test', accuracy)
    writer.add_scalar('Precision/test', precision)
    writer.add_scalar('Recall/test', recall)
    writer.add_scalar('F1_score/test', f1_score)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(confusion_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=range(num_classes), yticklabels=range(num_classes))
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()
    
    writer.add_figure('Confusion Matrix', plt.gcf(), global_step=0)
    
    writer.close()

<h3>Training Time</h3>

In [None]:
train("ResNet18", model_resnet18, train_loader, optimizer_resnet18, criterion_resnet18, scheduler_resnet18, num_epochs=20)

In [None]:
train("ResNet50", model_resnet50, train_loader, optimizer_resnet50, criterion_resnet50, scheduler_resnet50, num_epochs=20)

In [None]:
train("EfficientNetB4", model_efficientnet_b4, train_loader, optimizer_efficientnet_b4, criterion_efficientnet_b4, scheduler_efficientnet_b4, num_epochs=20)

In [None]:
train("simpleCNN", simpleCNN, train_loader, optimizer_simpleCNN, criterion_simpleCNN, None, num_epochs=20)

In [None]:
train("ResNet18WithDropout", model_resnet18WithDropout, train_loader, optimizer_resnet18WithDropout, criterion_resnet18WithDropout, scheduler_resnet18WithDropout, num_epochs=20)

In [None]:
train("ResNet50WithDropout", model_resnet50WithDropout, train_loader, optimizer_resnet50WithDropout, criterion_resnet50WithDropout, scheduler_resnet50WithDropout, num_epochs=20)

<h3>Testing</h3>

In [None]:
test("ResNet18", model_resnet18, test_loader)

In [None]:
test("ResNet50", model_resnet50, test_loader)

In [None]:
test("EfficientNetB4", model_efficientnet_b4, test_loader)

In [None]:
test("simpleCNN", simpleCNN, test_loader)

In [None]:
test("ResNet18WithDropout", model_resnet18WithDropout, test_loader)

In [None]:
test("ResNet50WithDropout", model_resnet50WithDropout, test_loader)

<h3>Results</h3>

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir ./runs