In [None]:
from google.colab import drive
drive.mount('/content/drive')
import zipfile, os, urllib.request, glob, math, shutil, sys
import torchvision
import torch.optim as optim
from torchvision import datasets, models, transforms
from sklearn.metrics import confusion_matrix, balanced_accuracy_score, precision_recall_fscore_support, roc_auc_score, accuracy_score
import sklearn.metrics
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import PIL
import PIL.Image

sys.path.append('drive/MyDrive')
import modelArchs

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resume = 0

Οι εικόνες είναι αποθηκευμένες στο drive σε zip.

Υπάρχουν 16 zip files, ένα για κάθε κλάση από το train και test dataset.

Κάνω extract τις εικόνες.

In [None]:
os.makedirs('train_imgs')
os.makedirs('val_imgs')

prefix = 'drive/MyDrive/'

for i in ['train_imgs', 'val_imgs']:
  for j in ['MEL','NV','BCC','AK','BKL','DF','VASC','SCC']:

    name = i + '_' + j
    src_zip = prefix + name + '.zip'
    dest_zip = i + '/' + j

    with zipfile.ZipFile(src_zip, 'r') as zip_ref:
      zip_ref.extractall(dest_zip)

    print("Finished with: ", name)

In [None]:
val_dataset = pd.read_csv('drive/MyDrive/val_dataset.csv')
train_dataset = pd.read_csv('drive/MyDrive/train_dataset.csv')
train_dataset.head()

Κάνω load τα pretrained μοντέλα trained πάνω στο imagenet. Στο τελευταίο layer αλλάζουμε τα out_features σε 8, καθώς έχουμε 8 διαφορετικές κλάσεις προς ταξινόμηση.

In [None]:
#model = modelArchs.get_EfficientNet(8,True)
#model = modelArchs.get_GoogleNet(8,True)
#model = modelArchs.get_MobileNet(8,True)
#model = modelArchs.get_Resnet(8,True)
model = modelArchs.get_MobileNetSa(8,True)
model.to(device)

Ορίζω διάφορα transformations ώστε να κάνουμε augment το training set, ώστε να πετύχουμε καλύτερο generalization του μοντέλου.

In [None]:
transforms_train = transforms.Compose([
    transforms.RandomResizedCrop(size = 224, scale = (0.4,1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness = [0.7, 1.3], contrast = [0.7, 1.3], saturation = [0.7, 1.3], hue = [-0.1,0.1]),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

transforms_val = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

Ορισμός υπερ-παραμέτρων

Load προηγούμενο checkpoint.

In [None]:
epochs = 30
batch_len = 48

criterion = nn.CrossEntropyLoss()
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma=0.2)

if resume:
  checkpoint = torch.load('drive/MyDrive/checkpoint6.pt')
  model.load_state_dict(checkpoint['model'])
  optimizer.load_state_dict(checkpoint['optimizer'])
  scheduler.load_state_dict(checkpoint['scheduler'])
  epoch_count = checkpoint['epoch']
  print(scheduler.get_last_lr())
else:
  epoch_count = 1

print(epoch_count)

Εδώ ορίζω τα dataset και dataloaders, χρησιμοποιώντας το weightRandomSampler για να δημιουργήσει balanced batches:

https://towardsdatascience.com/demystifying-pytorchs-weightedrandomsampler-by-example-a68aceccb452

In [None]:
def get_weights_for_imgs(images, n_categories):

    n_images = len(images)

    #εδώ θα μπορούσα να χρησιμοποιήσω έτοιμους τους αριθμούς images per class από το train_dataset.category.value_counts() , ωστόσο στην συνέχεια όταν κάνω enumerate(images) στην
    #τελευταία for, αυτό μπορεί να δημιουργήσει πρόβλημα, καθώς δεν ξέρουμε το image_category σε ποιό label πραγματικά αντιστοιχεί...
    imgs_per_category = [0] * n_categories
    for _, image_category in images:
      imgs_per_category[image_category] += 1

    weight_per_category = [0] * n_categories # έχουμε 8 κατηγορίες
    for i in range(n_categories):
      weight_per_category[i] = 1 / float(imgs_per_category[i])#float(n_images) / float(imgs_per_category[i])

    weights = [0] * n_images
    for idx, (_, image_category) in enumerate(images):
        weights[idx] = weight_per_category[image_category]

    return weights

train_dataset = datasets.ImageFolder('train_imgs', transforms_train)
val_dataset = datasets.ImageFolder('val_imgs', transforms_val)

len_train_dataset = len(train_dataset)
len_val_dataset = len(val_dataset)

print("Length of train_dataset: ", len_train_dataset)
print("Length of val dataset: ", len_val_dataset)

weights = get_weights_for_imgs(train_dataset.imgs, len(train_dataset.classes))
weights = torch.DoubleTensor(weights)
train_sampler = torch.utils.data.sampler.WeightedRandomSampler(weights, len(weights))

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_len, sampler = train_sampler, pin_memory=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size = 16, shuffle=False)

Ορίζω συναρτήσεις για υπολογισμό του accuracy και του confusion matrix πάνω στο test set. Στη συνέχεια θα δημιουργήσω και άλλες συναρτήσεις για υπολογισμό F1-score, sensitivity, specificity, precision.

In [None]:
def test_model(model,test_dataloader):

  test_acc = 0.0
  pred_list = []
  label_list = []
  output_probs = []
  metrics = []

  for i, (inputs, labels) in enumerate(test_dataloader,1):

    label_list.extend(labels.detach().tolist())

    inputs = inputs.to(device)
    labels = labels.to(device)
    outputs = model(inputs)
    probs = torch.softmax(outputs.detach().cpu(), dim = 1).tolist()
    output_probs.extend(probs)

    pred =  torch.max(outputs, 1)[1].detach().cpu().tolist()
    pred_list.extend(pred)

  conf_matrix = confusion_matrix(label_list , pred_list)

  metrics.append(accuracy_score(label_list, pred_list))
  metrics.extend(precision_recall_fscore_support(label_list,pred_list, average = 'macro'))
  metrics.append(roc_auc_score(label_list,output_probs, average = 'macro', multi_class = 'ovr'))
  metrics.extend(precision_recall_fscore_support(label_list,pred_list, average = 'weighted'))
  metrics.append(roc_auc_score(label_list,output_probs, average = 'weighted', multi_class = 'ovr'))

  return metrics, conf_matrix

Το βασικό train loop.

In [None]:
for epoch in range(epoch_count, epochs + 1):

  model.train()
  train_running_loss = 0

  print("Starting epoch: " , epoch)

  for i, (inputs, labels) in enumerate(train_dataloader,1):
      inputs = inputs.to(device)
      labels = labels.to(device)

      optimizer.zero_grad()
      outputs = model(inputs)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()

      train_running_loss += loss.detach().item()

      if i % 100 == 0:
        print("Batch number: %d" %(i))

  print('Epoch: %d | Loss: %.4f ' %(epoch, train_running_loss / i))
  scheduler.step()

  model.eval()
  try:
    metrics,conf_matrix = test_model(model,val_dataloader)
    print("Metrics on epoch %d: " %epoch)
    print(metrics)
    print("Confusion matrix:")
    print(conf_matrix)
  except:
    print("Error in calculating metrics")

  try:
    checkpoint = {'epoch': epoch+1, 'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'scheduler': scheduler.state_dict()}
    checkpoint_fn = 'drive/MyDrive/checkpoint' + str(epoch) + '.pt'
    torch.save(checkpoint, checkpoint_fn)
  except:
    print("Error in writing general checkpoint")
  if epoch - epoch_count >= 5:
    break

Το επόμενο cell είναι για έλεγχο των διαστάσεων input, output και της κατανομής των κλάσεων μέσα σε ένα batch.

In [None]:
from collections import Counter

for inputs, labels in train_dataloader:
  inputs = inputs.to(device)
  print("Input size:", inputs.shape)
  out = model(inputs)
  print("Output size: ", out.shape)

  labels = labels.tolist()
  print("Test if batch is balanced: ")
  print(Counter(labels).keys())
  print(Counter(labels).values())
  break

Input size: torch.Size([64, 3, 224, 224])
Output size:  torch.Size([64, 8])
Test if batch is balanced: 
dict_keys([1, 3, 7, 5, 4, 6, 0, 2])
dict_values([6, 10, 10, 11, 8, 7, 7, 5])
