In [None]:
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
path_header = "/content/drive/My Drive/data/"

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sn
import random
import sklearn
import torch
from torch import nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, models, transforms
from torchvision.models.resnet import ResNet, BasicBlock
import scipy.stats as st
from sklearn import metrics
from sklearn.metrics import confusion_matrix, roc_curve, auc
from torchvision import transforms
import torchvision.models as models
import time
import copy
from torch.utils.data import Dataset, DataLoader, random_split, SubsetRandomSampler, WeightedRandomSampler
from sklearn.metrics import RocCurveDisplay, auc, roc_curve, confusion_matrix, roc_auc_score

References:

https://pytorch.org/tutorials/beginner/basics/data_tutorial.html

https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

https://towardsdatascience.com/pytorch-basics-sampling-samplers-2a0f29f0bf2a

https://github.com/bdrad/bdr-cv-onboarding-KellyTrinh/tree/master/bdr-cv-onboarding-KellyTrinh

https://biasedml.com/roc-comparison/

## Load dataset

In [None]:
# Load the labels and split into train and test 
all_labels = pd.read_csv(path_header+"labels.csv")
test_labels = all_labels[all_labels["File Name"].str.contains("Test")]
train_labels = all_labels[all_labels["File Name"].str.contains("Train")]

test_labels.to_csv(path_header+"test_labels.csv", index=False)
train_labels.to_csv(path_header+"train_labels.csv", index=False)

In [None]:
class Mammogram_Dataset(Dataset):
  """
    A custom dataset class for Mammogram files. 

    """

  def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
    '''
    Class inherits from PyTorch's Dataset. Acts as custom 
    Pytorch dataset. Initialize the directory containing the images, 
    the annotation files, and th transforms.

    Parameters
    ----------
    annotations_file: str
        A csv with the file names and labels of the images in the files. 
    img_dir: str 
        The directory that contains the images
    transform: transform
        Modify the features using this transform
    target_transform: transform
        Modify the labels using this transform
    '''
      
    self.img_labels = pd.read_csv(annotations_file)
    self.img_dir = img_dir
    self.transform = transform 
    self.target_transform = target_transform

  def __len__(self):
    '''
    Return the number of samples in the dataset
    '''
    return len(self.img_labels)


  def __getitem__(self, idx):

      # get image
      img_name = path_header +"numpyz_files/" + self.img_labels["File Name"][idx]
      image = np.load(img_name)["arr_0"]
      image = np.array(image).astype("float32")
      
      # get density
      density = int(self.img_labels["Density"][idx])
      density = np.array(density).astype("float32")

      if self.transform:
            image = self.transform(image)
          
      image = image.type(torch.float32)
      density = torch.tensor(density, dtype=torch.int64)

      return image, density

In [None]:
tensor_transform = transforms.Compose([transforms.ToTensor()])
test_dataset = Mammogram_Dataset(annotations_file=path_header+"test_labels.csv", img_dir=path_header+"numpyz_files", transform=tensor_transform)
train_dataset = Mammogram_Dataset(annotations_file=path_header+"train_labels.csv", img_dir=path_header + "numpyz_files", transform=tensor_transform)

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)

In [None]:
# Display 10 mammogram images 

test_iter =  iter(test_dataloader)
features, labels = test_iter.next()

fig = plt.figure(figsize=(20, 10))
rows = 2
columns = 5

num_img = 10
for idx in range(num_img):
  img = features[idx].squeeze()
  label = labels[idx].item()
  fig.add_subplot(rows, columns, idx + 1)
  plt.imshow(img, cmap="gray")
  plt.title("Breast density:" + str(label))

## Train model

In [None]:
model = models.resnet18(num_classes=2)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)
model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.0001)

In [None]:
def train_model_val(model, criterion, optimizer, num_epochs=25):
    '''
    Return the trained model and print training and validation loss after each epoch

    Parameters
    ----------
      model: Pytorch model
          Starting model
      criterion: Pytorch criterion
          Compute a gradient using the given loss function
      optimizer: Pytorch optimizer
          Performs updates
      num_epochs: int
          Number of iterations to perform
    '''
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    np.random.seed(0)
    torch.manual_seed(0)

    train_dataset2, val_dataset = random_split(train_dataset, (round(len(train_dataset) * .80), round(len(train_dataset) * .20)))
    train_loader = DataLoader(dataset=train_dataset2, shuffle=True, batch_size=1)
    val_loader = DataLoader(dataset=val_dataset, shuffle=False, batch_size=1)


    dataloaders = {'train': train_loader, 'val': val_loader}
    dataset_sizes = {'train': len(train_loader), 'val': len(val_loader)}
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
train_model_val(model, criterion, optimizer, num_epochs=5)

## Test and evaluate model

In [None]:
correct = 0
total = 0

# set model to prediction mode
model.eval()

# keep track of output probabilities and true labels for model evaluation
y_predict = []
y_probs = []
y_true = []

# not training so don't need to calculate the gradients
with torch.no_grad():
    for data in test_dataloader:
      
      # send data to device
      images, labels = data[0].to(device), data[1].to(device)

      # perform prediction 
      outputs = model(images)
      # pass through sigmoid layer
      predicted_probs = torch.sigmoid(outputs.data)
      # choose the class with the highest energy as the prediction
      _, predicted = torch.max(predicted_probs, 1)

      # for later model evaluation
      y_predict.extend(predicted.cpu().numpy())
      y_true.extend(labels.cpu().numpy())
      y_probs.extend(predicted_probs.cpu().numpy())

      # keeping track of number of correct predictions
      total += labels.size(0)
      correct += (predicted == labels).sum().item()

print('Accuracy: %d %%' % (100 * correct / total))

In [None]:
def confusion_matrix(y_true, y_pred):
    '''
    y_pred: predictions array
    y_true: true labels array
    ------
    Display a confusion matrix. 
    '''
    cm = sklearn.metrics.confusion_matrix(y_true, y_pred, labels = [0, 1])
    df_cm = pd.DataFrame(cm, ["True 0", "True 1"], ["Predicted 0", "Predicted 1"])
    sn.heatmap(df_cm, annot=True, annot_kws={"size": 16})

def roc_curve(y_prob, y_true):
    '''
    y_prob: (NUMPY ARRAY) predicted probabilities
    y_true: (NUMPY ARRAY) true labels
    ------
    Display ROC curve.
    '''

    y_prob = np.array(y_prob)

    # calculate values to draw the roc curve 
    fpr, tpr, _ = sklearn.metrics.roc_curve(y_true, np.array(y_prob)[:,1])
    # calculate the area under the curve value 
    roc_auc = auc(fpr, tpr)

    # plot the roc curve with the auc value 
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange',
            lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC curve')
    plt.legend(loc="lower right")
    plt.show()

def bootstrap_auc(y_true, y_pred, random_state=24, percentile=95):
    '''
    y_pred: (NUMPY ARRAY) predictions
    y_true: (NUMPY ARRAY) true labels
    random_state: (INT) random seed used when bootstrapping
    percentile: (INT) which percentile to bootstrap (i.e 95, 80, etc.)
    ------
    Bootstrap the AUC values and visualize the histogram 
    of the bootstrapped values. Return the lower bound
    and upper bound of the confidence interval around the 
    bootstrap AUC values. 
    '''
    
    # bootstrap the AUC value 

    # set the seed
    seed = 100
    random.seed(seed)
    num_pred = len(y_pred)

    num_bootstrap = 1000 

    # list of bootstrapped AUC values
    bootstrapped_auc = np.array([])

    # perform bootstrap 
    for i in range(num_bootstrap):
      # sample with replacement from the indices of the y prediction list
      random_idx = random.choices(range(num_pred), k = num_pred)

      b_y_pred = np.array(y_pred)[random_idx]
      b_y_true = np.array(y_true)[random_idx]

      # generate new AUC value from the bootstrapped y prediction values 
      fpr, tpr, _ = sklearn.metrics.roc_curve(b_y_true, b_y_pred)
      roc_auc = auc(fpr, tpr)

      bootstrapped_auc = np.append(bootstrapped_auc, roc_auc)

    # plot the histogram 
    plt.title("Histogram of bootstrapped AUC values")    
    plt.hist(bootstrapped_auc, bins=50);

    lower_bound = np.percentile(bootstrapped_auc, (100 - percentile) / 2)
    upper_bound = np.percentile(bootstrapped_auc, (100 - percentile) / 2 + percentile)

    return lower_bound, upper_bound


In [None]:
confusion_matrix(y_true, y_predict)

In [None]:
roc_curve(y_probs, y_true)

In [None]:
bootstrap_auc(y_true, y_predict)

In [None]:
# create the Delong test 

def z_score(var_A, var_B, covar_AB, auc_A, auc_B):
    '''
    Calculate the z-score.
    Let auc_a = auc of first classifier; 
    auc_b = auc of second classifier. 

    z-score = (auc_b - auc_b) / Var(auc_a - auc_b)
    where 
    Var(auc_a - auc_b) = Var(auc_a) + Var(auc_b) - 2*Cov(auc_a, auc_b)
    '''
    return (auc_A - auc_B)/((var_A + var_B - 2*covar_AB)**(.5))

def auc(X, Y):
    '''
    Calculates the auc which is equal to the Mann-Whitney statistic 
    applied to the two samples X and Y. 
    '''
    return 1/(len(X)*len(Y)) * sum([self.kernel(x, y) for x in X for y in Y])

def kernel(X, Y):
    '''
    Helper function to calculate the auc.
    If X = Y return 1/2, if X < Y return 0, else return 1.
    '''
    return .5 if Y==X else int(Y < X)

def structural_components(X, Y):
    '''
    Estimate the variance V (arising from the finite test set) 
    and the covariance C (due to the common test set).
    '''
    V10 = [1/len(Y) * sum([kernel(x, y) for y in Y]) for x in X]
    V01 = [1/len(X) * sum([kernel(x, y) for x in X]) for y in Y]
    return V10, V01
      
def get_S_entry(V_A, V_B, auc_A, auc_B):
    '''
    Using the theory on generalized U-statistics, 
    the function get_S_entry(V_A, V_B, auc_A, auc_B): 
    generates an estimated variance-covariance matrix S.
    '''
    return 1/(len(V_A)-1) * sum([(a-auc_A)*(b-auc_B) for a,b in zip(V_A, V_B)])

def group_preds_by_label(preds, actual):
  
    X = [p for (p, a) in zip(preds, actual) if a] # prediction with actual label = 1
    Y = [p for (p, a) in zip(preds, actual) if not a] # prediction with actual label = 0
    return X, Y
  
def DeLongTest(y_probs, y_true):
    '''
    Performs a DeLong test to test if the different in the auc value 
    of the classifier (classifer A) with the auc of a random classifier 
    (classifier B) is statistically significant. 
    '''
    
    preds_A = np.array(y_probs)[:,1]
    actual = y_true
    preds_B = np.random.rand(preds_A.shape[0]) # predictions from a random classifier
    X_A, Y_A = group_preds_by_label(preds_A, actual) # model A
    X_B, Y_B = group_preds_by_label(preds_B, actual) # model B

    V_A10, V_A01 = structural_components(X_A, Y_A) 
    V_B10, V_B01 = structural_components(X_B, Y_B) 
    auc_A = auc(X_A, Y_A)
    auc_B = auc(X_B, Y_B)

    # Compute entries of covariance matrix S (covar_AB = covar_BA)
    var_A = (get_S_entry(V_A10, V_A10, auc_A, auc_A) * 1/len(V_A10)
            + get_S_entry(V_A01, V_A01, auc_A, auc_A) * 1/len(V_A01))
    var_B = (get_S_entry(V_B10, V_B10, auc_B, auc_B) * 1/len(V_B10)
            + get_S_entry(V_B01, V_B01, auc_B, auc_B) * 1/len(V_B01))
    covar_AB = (get_S_entry(V_A10, V_B10, auc_A, auc_B) * 1/len(V_A10)
                + get_S_entry(V_A01, V_B01, auc_A, auc_B) * 1/len(V_A01))
    # Two tailed test
    z = z_score(var_A, var_B, covar_AB, auc_A, auc_B)
    p = st.norm.sf(abs(z))*2

    return p

In [None]:
DeLongTest(y_probs, y_true) 