In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import os
import random
import copy
import shutil

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torchvision import models, transforms
from sklearn.metrics import auc

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
use_gpu = torch.cuda.is_available()

class mymodel(nn.Module):

  def blocktype1(self, in_channels, out_channels, kernel=5):
    block = torch.nn.Sequential(
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=in_channels, padding=2, stride=1),
        torch.nn.ReLU(),
        torch.nn.BatchNorm2d(in_channels),
        torch.nn.Dropout2d(p=0.15),
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=out_channels, padding=2, stride=1 ),
        torch.nn.ReLU()
    )
    return block

  def blocktype2(self, in_channels, out_channels, kernel=3):
    block = torch.nn.Sequential(
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=in_channels, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=in_channels, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.BatchNorm2d(in_channels),
        torch.nn.Dropout2d(p=0.2),
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=out_channels, stride=1, padding=1),
        torch.nn.ReLU()
    )
    return block

  def blocktype3(self, in_channels, out_channels, kernel=7):
    block = torch.nn.Sequential(
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=in_channels, padding=3, stride=1),
        torch.nn.ReLU(), 
        torch.nn.BatchNorm2d(in_channels),
        torch.nn.Dropout2d(p=0.1),
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=out_channels, padding=3, stride=1),
        torch.nn.ReLU()
    )
    return block

  def special_convolutions(self, in_channels, out_channels, step, kernel=3):
    block = torch.nn.Sequential(
        torch.nn.Conv2d(kernel_size=kernel, in_channels = in_channels, out_channels=out_channels, padding=1, stride=2**step)
    )
    return block

  def fclayer(self, in_features, out_features):
    layer = torch.nn.Sequential(
        torch.nn.Linear(in_features=in_features, out_features=out_features),
        torch.nn.BatchNorm1d(num_features=out_features),
        torch.nn.Dropout(p=0.25),
        torch.nn.ReLU()
    )
    return layer

  def pointwiseconv(self, in_channels, out_channels):
    layer = torch.nn.Conv2d(kernel_size=1, in_channels=in_channels, out_channels=out_channels)
    return layer

  def __init__(self):
    super(mymodel, self).__init__()
    self.first = self.pointwiseconv(3,4)
    self.resize1 = self.special_convolutions(4,128,step=1)
    self.resize2 = self.special_convolutions(4,512,step=4)
    self.conv1 = self.blocktype3(4,64)
    self.p1 = self.pointwiseconv(4,64)
    self.maxpool1 = torch.nn.MaxPool2d(kernel_size=2)

    self.conv2 = self.blocktype1(64,128)
    self.p2 = self.pointwiseconv(64,128)
    self.maxpool2 = torch.nn.MaxPool2d(kernel_size=2)

    self.conv3 = self.blocktype1(128,256)
    self.p3 = self.pointwiseconv(128,256)
    self.maxpool3 = torch.nn.MaxPool2d(kernel_size=2)

    self.conv4 = self.blocktype2(256,256)
    self.maxpool4 = torch.nn.MaxPool2d(kernel_size=2)

    self.conv5 = self.blocktype2(256,512)
    self.p5 = self.pointwiseconv(256,512)
    self.conv6 = self.blocktype2(512,512)
    self.conv7 = self.blocktype2(512,1024)
    self.maxpool5 = torch.nn.MaxPool2d(kernel_size=2)

    self.fc1 = self.fclayer(7*7*1024,64)
    self.fc2 = self.fclayer(64,2)
    self.fc3 = self.fclayer(16,2)

  def activations_hook(self, grad):
    self.gradients = grad

  def forward(self, x):
    x = self.first(x)
    r1 = self.resize1(x)
    r2 = self.resize2(x)
    x1 = self.conv1(x)
    x = self.p1(x)
    x = x+x1
    x = self.maxpool1(x)

    x1 = self.conv2(x)
    x = self.p2(x)
    x = x+x1
    x = x+r1
    x = self.maxpool2(x)

    x1 = self.conv3(x)
    x = self.p3(x)
    x = x+x1
    x = self.maxpool3(x)

    x1 = self.conv4(x)
    x = x+x1
    x = self.maxpool4(x)

    x1 = self.conv5(x)
    x = self.p5(x)
    x = x+x1
    x = x+r2
    x1 = self.conv6(x)
    x = x+x1
    xdash = self.conv7(x)
    #hook = xdash.register_hook(self.activations_hook)  #comment this line when training
    x = self.maxpool5(xdash)
    x = torch.flatten(x, 1)
    fclayer1 = self.fc1(x)
    fclayer2 = self.fc2(fclayer1)
    #fclayer3 = self.fc3(fclayer2)
    return fclayer2

In [None]:
"""use_gpu = torch.cuda.is_available()

class mymodel(nn.Module):

  def blocktype1(self, in_channels, out_channels, kernel=5):
    block = torch.nn.Sequential(
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=in_channels, padding=2, stride=1),
        torch.nn.ReLU(),
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=out_channels, stride=1, padding=2),
        torch.nn.BatchNorm2d(out_channels),
        torch.nn.ReLU()
    )
    return block

  def blocktype2(self, in_channels, out_channels, kernel=3):
    block = torch.nn.Sequential(
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=in_channels, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=in_channels, stride=1, padding=1),
        torch.nn.LeakyReLU(),
        torch.nn.Conv2d(kernel_size=kernel, in_channels=in_channels, out_channels=out_channels, stride=1, padding=1),
        torch.nn.BatchNorm2d(out_channels),
        torch.nn.ReLU()
    )
    return block

  def fclayer(self, in_features, out_features):
    layer = torch.nn.Sequential(
        torch.nn.Linear(in_features=in_features, out_features=out_features),
        torch.nn.BatchNorm1d(out_features),
        torch.nn.ReLU()
    )
    return layer

  def pointwiseconv(self, in_channels, out_channels):
    layer = torch.nn.Conv2d(kernel_size=1, in_channels=in_channels, out_channels=out_channels)
    return layer

  def __init__(self):
    super(mymodel, self).__init__()
    self.conv1 = self.blocktype1(3, 64)
    self.p1 = self.pointwiseconv(3,64)
    self.maxpool1 = torch.nn.MaxPool2d(kernel_size=2)
    self.conv2 = self.blocktype1(64,128)
    self.p2 = self.pointwiseconv(64,128)
    self.maxpool2 = torch.nn.MaxPool2d(kernel_size=2)
    self.conv3 = self.blocktype2(128,256)
    self.p3 = self.pointwiseconv(128,256)
    self.maxpool3 = torch.nn.MaxPool2d(kernel_size=2)
    self.conv4 = self.blocktype2(256,512)
    self.p4 = self.pointwiseconv(256,512)
    self.maxpool4 = torch.nn.MaxPool2d(kernel_size=2)
    self.conv5 = self.blocktype2(512,512)
    self.p5 = self.pointwiseconv(512,512)
    self.conv6 = self.blocktype2(512,512)
    self.maxpool5 = torch.nn.MaxPool2d(kernel_size=2)
    self.fc1 = self.fclayer(7*7*512, 64)
    self.fc2 = self.fclayer(64,2)
    self.gradients = None

  def activations_hook(self, grad):
    self.gradients = grad

  def forward(self, x):
    x1 = self.conv1(x)
    x = self.p1(x)
    x+=x1
    x = self.maxpool1(x)
    x1 = self.conv2(x)
    x = self.p2(x)
    x+=x1
    x = self.maxpool2(x)
    x1 = self.conv3(x)
    x = self.p3(x)
    x+=x1
    x = self.maxpool3(x)
    x1 = self.conv4(x)
    x = self.p4(x)
    x+=x1
    x = self.maxpool4(x)
    x1 = self.conv5(x)
    x = self.p5(x)
    x+=x1
    xdash = self.conv6(x)
    #hook = xdash.register_hook(self.activations_hook)  #comment this line when training
    x = self.maxpool5(xdash)
    x = torch.flatten(x, 1)
    fclayer1 = self.fc1(x)
    fclayer2 = self.fc2(fclayer1)
    return fclayer2"""

In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs):
  val_loss_history = []
  train_loss_history = []
  train_acc_history = []
  val_acc_history = []
  best_model_wts = copy.deepcopy(model.state_dict())
  best_acc = 0
  for epoch in range(num_epochs):
    print(f"Epoch Number: {epoch+1} / {num_epochs}")

    for phase in ['train','val']:
      if phase == "train":
        model.train()
      else:
        model.eval()
      running_loss = 0
      running_corrects = 0
      for inputs, labels in dataloaders[phase]:
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        with torch.set_grad_enabled(phase == "train"):
          outputs = model(inputs)
          loss = criterion(outputs, labels)
          _,preds = torch.max(outputs, 1)
          if phase == "train":
            loss.backward()
            optimizer.step()

        running_loss += loss.item()*inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)  

      if phase=="val":
          scheduler.step(epoch_loss)
      

      epoch_loss = running_loss / len(dataloaders[phase].dataset)
      epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
      print(f'{phase} loss: {epoch_loss} and acc: {epoch_acc}')

      if phase == "val" and epoch_acc>best_acc:
        best_acc = epoch_acc
        best_model_wts = copy.deepcopy(model.state_dict())
      if phase == "val":
        val_loss_history.append(epoch_loss)
        val_acc_history.append(epoch_acc)
      if phase == "train":
        train_loss_history.append(epoch_loss)
        train_acc_history.append(epoch_acc)
    print()
  
  print(f'Best val acc: {best_acc}')
  model.load_state_dict(best_model_wts)
  return model, val_loss_history, train_loss_history, val_acc_history, train_acc_history

In [None]:
if use_gpu:
  device = torch.device("cuda:0")
model = mymodel()
model = model.to(device)
num_classes = 2
input_size = 224
batch_size = 32
num_epochs = 400
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=0.05) #weight_decay implements L2 regularization
weights = torch.tensor([1.0,234/191]).cuda()
criterion = nn.CrossEntropyLoss(weight=weights)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.75, patience=50)

In [None]:
data_dir = "
train_transforms = transforms.Compose([transforms.Resize((input_size, input_size)), transforms.ToTensor()])
train_data = torchvision.datasets.ImageFolder(root=data_dir + 'train/', transform=train_transforms)
train_dataloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle = False)

train_mean = []
train_std = []
for i, data in enumerate(train_dataloader, 0):
    # shape (batch_size, 3, height, width)
    numpy_image = data[0].numpy()
    batch_mean = np.mean(numpy_image, axis=(0,2,3))
    batch_std = np.std(numpy_image, axis=(0,2,3))
    train_mean.append(batch_mean)
    train_std.append(batch_std)

train_mean = np.array(train_mean).mean(axis=0)
train_std = np.array(train_std).mean(axis=0)
print(train_mean, train_std)

[0.5980851  0.59779495 0.5975965 ] [0.3109232  0.310951   0.31100973]


In [None]:
data_dir = "

data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((input_size,input_size)),
        transforms.RandomRotation((-15,15)),
        transforms.ColorJitter(brightness=0.25, contrast=0.15),
        transforms.RandomAffine(0, translate=None, scale=[0.5,1.5], shear=None, resample=False, fillcolor=0),
        transforms.ToTensor(),
        transforms.Normalize(train_mean,train_std)]),
    'val': transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.ToTensor(),
        transforms.Normalize(train_mean, train_std)])
}   

train_data = torchvision.datasets.ImageFolder(root=data_dir + 'train/', transform=data_transforms['train'])
train_dataloader = torch.utils.data.DataLoader(train_data, batch_size = batch_size, num_workers=4, shuffle = True)
val_data = torchvision.datasets.ImageFolder(root=data_dir + 'val/', transform=data_transforms['val'])
val_dataloader = torch.utils.data.DataLoader(val_data, batch_size = batch_size, num_workers=4, shuffle = True)
x = train_data.class_to_idx
print(x)
x = val_data.class_to_idx
print(x)

dataloaders_dict = {'train':train_dataloader, 'val':val_dataloader}

{'negative': 0, 'positive': 1}
{'negative': 0, 'positive': 1}


In [None]:
model, vhist, thist, vacc, tacc = train_model(model, dataloaders_dict, criterion, optimizer, num_epochs)

In [None]:
vhist = [v for v in vhist]
thist = [t for t in thist]

vacc = [v for v in vacc]
tacc = [t for t in tacc]

plt.plot(np.arange(num_epochs)+1, thist, 'r', label="Training loss")
plt.plot(np.arange(num_epochs)+1, vhist, 'g', label="Validation loss")
plt.ylim(0,1) 
plt.legend(loc = "lower right")
plt.show()

In [None]:
plt.plot(np.arange(num_epochs)+1, tacc, 'r', label="Training acc")
plt.plot(np.arange(num_epochs)+1, vacc, 'g', label="Validation acc")
plt.legend(loc = "lower right")
plt.ylim(0,1) 
plt.show()

In [None]:
test_transform = transforms.Compose([
        transforms.Resize((input_size,input_size)),
        transforms.ToTensor(),
        transforms.Normalize(train_mean, train_std)])
test_data = torchvision.datasets.ImageFolder(root=data_dir + 'test/', transform=test_transform)
test_dataloader = torch.utils.data.DataLoader(test_data, batch_size = 4, num_workers=4, shuffle = False)
x = test_data.class_to_idx
print(x)
def predict(model, test_loader, device):
    model.eval()
    y_true = torch.tensor([], dtype=torch.long, device=device)
    all_outputs = torch.tensor([], device=device)
    with torch.no_grad():
        for data in test_loader:
            inputs = [i.to(device) for i in data[:-1]]
            labels = data[-1].to(device)
            
            outputs = model(*inputs)
            y_true = torch.cat((y_true, labels), 0)
            all_outputs = torch.cat((all_outputs, outputs), 0)
    
    y_true = y_true.cpu().numpy()  
    _, y_pred = torch.max(all_outputs, 1)
    y_pred = y_pred.cpu().numpy()
    y_pred_prob = F.softmax(all_outputs, dim=1).cpu().numpy()
    
    return y_true, y_pred, y_pred_prob

y_true, y_pred, y_pred_prob = predict(model, test_dataloader, "cuda:0")
print(y_true, y_pred)

In [None]:
def true_positives(y_true, y_pred_prob, th):
  TP = 0
  thresholded_probs = y_pred_prob >= th
  TP = np.sum((y_true==1) & (thresholded_probs==1))
  return TP

def true_negatives(y_true, y_pred_prob, th):
  TN = 0
  thresholded_probs = y_pred_prob >= th
  TN = np.sum((y_true==0) & (thresholded_probs==0))
  return TN

def false_positives(y_true, y_pred_prob, th):
  FP = 0
  thresholded_probs = y_pred_prob >= th
  FP = np.sum((y_true==0) & (thresholded_probs==1))
  return FP

def false_negatives(y_true, y_pred_prob, th):
  FN = 0
  thresholded_probs = y_pred_prob >= th
  FN = np.sum((y_true==1) & (thresholded_probs==0))
  return FN

In [None]:
def get_accuracy(y_true, y_pred_prob, th):
  TP = true_positives(y_true, y_pred_prob, th)
  TN = true_negatives(y_true, y_pred_prob, th)
  FP = false_positives(y_true, y_pred_prob, th)
  FN = false_negatives(y_true, y_pred_prob, th)
  accuracy = (TP + TN)/(TP + TN + FP + FN)
  return accuracy

def get_prevalence(y_true):
  prevalence = 0
  prevalence = np.sum(y_true)/len(y_true)
  return prevalence

def get_sensitivity(y_true, y_pred_prob, th):
  sensitivity = 0
  TP = true_positives(y_true, y_pred_prob, th)
  FN = false_negatives(y_true, y_pred_prob, th)
  sensitivity = TP / (TP + FN)
  return sensitivity

def get_specificity(y_true, y_pred_prob, th):
  specificity = 0
  TN = true_negatives(y_true, y_pred_prob, th)
  FP = false_positives(y_true, y_pred_prob, th)
  specificity = TN / (TN + FP)
  return specificity

def get_ppv(y_true, y_pred_prob, th):
  ppv = 0
  TP = true_positives(y_true, y_pred_prob, th)
  FP = false_positives(y_true, y_pred_prob, th)
  ppv = TP / (FP + TP)
  return ppv

def get_npv(y_true, y_pred_prob, th):
  npv = 0
  TN = true_negatives(y_true, y_pred_prob, th)
  FN = false_negatives(y_true, y_pred_prob, th)
  npv = TN / (FN + TN)
  return npv

def get_roc_curve_info(y_true, y_pred_prob):
  threshold_set = np.arange(0,1,0.00001)
  TPR = np.zeros(len(threshold_set))
  FPR = np.zeros(len(threshold_set))
  i=0
  for th in threshold_set:
    TPR[i] = get_sensitivity(y_true, y_pred_prob, th)
    spec = get_specificity(y_true, y_pred_prob, th)
    FPR[i] = (1 - spec)
    i+=1
  return TPR, FPR

def precision_recall_curve_info(y_true, y_pred_prob):
  threshold_set = np.arange(0,1,0.00001)
  precision = np.zeros(len(threshold_set))
  recall = np.zeros(len(threshold_set))
  i=0
  for th in threshold_set:
    precision[i] = get_ppv(y_true, y_pred_prob, th)
    recall[i] = get_sensitivity(y_true, y_pred_prob, th)
    i+=1
  precision = np.array([i for i in precision if (str(i)!='nan')])
  precision = np.concatenate((precision, np.ones(len(recall) - len(precision))))
  return precision, recall

def get_statistics(y_true, y_pred_prob, th):
  accuracy = get_accuracy(y_true, y_pred_prob, th)
  prevalence = get_prevalence(y_true)
  sensitivity = get_sensitivity(y_true, y_pred_prob, th)
  specificity = get_specificity(y_true, y_pred_prob, th)
  ppv = get_ppv(y_true, y_pred_prob, th)
  npv = get_npv(y_true, y_pred_prob, th)
  tpr, fpr = get_roc_curve_info(y_true, y_pred_prob)
  AUC = auc(fpr,tpr)
  prec_list, rec_list = precision_recall_curve_info(y_true, y_pred_prob)
  score = auc(rec_list, prec_list)
  f1_score = 2*ppv*sensitivity/(ppv + sensitivity)
  stat_dict = {'Accuracy': accuracy, 
               'Prevalence':prevalence,
               'Sensitivity':sensitivity,
               'Specificity':specificity,
               'PPV':ppv,
               'NPV':npv,
               'AUC':AUC,
               'F1 Score':f1_score,
               'Score': score}
  return stat_dict, tpr, fpr, prec_list, rec_list

In [None]:
def decide_th(y_true, y_pred_prob):
  threshold_set = np.arange(0,1,0.00001)
  TPR = np.zeros(len(threshold_set))
  FPR = np.zeros(len(threshold_set))
  J = np.zeros(len(threshold_set))
  i=0
  for th in threshold_set:
    TPR[i] = get_sensitivity(y_true, y_pred_prob, th)
    spec = get_specificity(y_true, y_pred_prob, th)
    FPR[i] = (1 - spec)
    J[i] = TPR[i] - FPR[i]
    i+=1
  good_th = np.argmax(J)
  return threshold_set[good_th]

good_th = decide_th(y_true, y_pred_prob[:,1])
print(good_th)

0.48573000000000005


In [None]:
stat_dict, tpr, fpr, prec_list, rec_list = get_statistics(y_true, y_pred_prob[:,1], 0.5)
print(stat_dict)

In [None]:
def plot_prc(precision, recall):
  plt.plot(recall, precision, 'r')
  plt.xlabel("Recall")
  plt.ylabel("Precision")
  plt.title("Precision-Recall Curve")
  plt.show()

def plot_roc(TPR, FPR):
  plt.plot(FPR, TPR, 'r', label = "ROC Curve")
  plt.plot(np.arange(0,1.05,0.05), np.arange(0,1.05,0.05), 'k', label = "y=x line")
  plt.legend(loc = "lower right")
  plt.xlabel('FPR')
  plt.ylabel('TPR')
  plt.title("ROC Curve")
  plt.show()

In [None]:
plot_roc(tpr, fpr)

In [None]:
plot_prc(prec_list, rec_list)

In [None]:
#torch.save(model.state_dict(), 'weights.pth')

In [None]:
"""ppv_set = []
acc_set = []
npv_set = []
sens_set = []
spec_set = []
threshold_set1 = np.arange(0,1,0.05)
for th in threshold_set1:
  stat_dict1,_,_,_,_ = get_statistics(y_true, y_pred_prob[:,1], th)
  acc_set.append(stat_dict1['Accuracy'])
  npv_set.append(stat_dict1['NPV'])
  spec_set.append(stat_dict1['Specificity'])
  sens_set.append(stat_dict1['Sensitivity'])
  ppv_set.append(stat_dict1['PPV'])
plt.plot(threshold_set1, ppv_set,  label = "PPV")
plt.plot(threshold_set1, acc_set,  label = "Accuracy")
plt.plot(threshold_set1, npv_set,  label = "NPV")
plt.plot(threshold_set1, sens_set,  label = "Sensitivity")
plt.plot(threshold_set1, spec_set,  label = "Specificity")
plt.xlabel('Thresholds')
plt.legend(loc = "best")
plt.show()"""