In [None]:
import pandas as pd
import os
import math
from glob import glob
import numpy as np
import torch
from torch.utils.data import Dataset
from torch.utils.data import random_split
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import transforms
from PIL import Image
import zipfile
import urllib.request
import os.path
from IPython.display import display
from math import ceil
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision.transforms.functional import to_pil_image
from matplotlib import colormaps
import PIL
import torchvision.ops as ops

In [None]:
class DataPreprocessing(Dataset):

    classEncoding = {
        'Atelectasis': torch.FloatTensor([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
        'Consolidation': torch.FloatTensor([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
        'Infiltration': torch.FloatTensor([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
        'Pneumothorax': torch.FloatTensor([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
        'Edema': torch.FloatTensor([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
        'Emphysema': torch.FloatTensor([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
        'Fibrosis': torch.FloatTensor([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
        'Effusion': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]),
        'Pneumonia': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]),
        'Pleural_Thickening': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]),
        'Cardiomegaly': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]),
        'Nodule': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0]),
        'Hernia': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]),
        'Mass': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]),
        'No Finding': torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])
    }

    def __init__(self,classEncoding=classEncoding, max_samples=None):
        self.image_names = []
        self.labels = []
        #The original code opened the labels file from the zip file. I upload the labels file manually to the directory and open it manually here
        f = open('/kaggle/input/data/Data_Entry_2017.csv','r')
        #with zipfile.ZipFile('images_samples2.zip', 'r') as z:
          #f = z.open('sample_labels_504.csv','r')
        title = True
        for line in f:
          if (title):
            title = False
            continue
          items = str(line).split(",")
            
          #image_name = items[0][2:]
          image_name = items[0]
          #print(image_name)
          image_name = glob(os.path.join("/kaggle/input/data/*/*", image_name))[0]
          #print(image_name)
          self.image_names.append(image_name)

          label = items[1]  # list of diseases
          diseases_list = label.split("|")
          labelTensor = torch.FloatTensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
          for disease in diseases_list:
            labelTensor = labelTensor.add(classEncoding[disease])
          self.labels.append(labelTensor)

          if max_samples and len(self.image_names) >= max_samples:
            break

#Modified code
    def __getitem__(self, index):
      #The original code opened the images directly from the zip file. As we already unziped them and we manually changed the current directory
      #to where they were unziped, we just open the images from the current directory
      image_path = self.image_names[index]
      #with zipfile.ZipFile('images_samples2.zip', 'r') as z:
      #image = Image.open(z.open(image_path,'r')).convert('RGB')
      image = Image.open(image_path,'r').convert('RGB')
      normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      preprocess = transforms.Compose([
            transforms.Resize(350),
            transforms.CenterCrop(300),
            transforms.RandomHorizontalFlip(p=0.2),
            transforms.ToTensor()
            #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
      image = preprocess(image)
      return image, self.labels[index]

#Original code
    # def __getitem__(self, index):
    #   image_path = self.image_names[index]
    #   with zipfile.ZipFile('images_samples2.zip', 'r') as z:
    #     image = Image.open(z.open(image_path,'r')).convert('RGB')
    #     normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    #     preprocess = transforms.Compose([
    #         transforms.Resize(256),
    #         transforms.TenCrop(224),
    #         transforms.Lambda
    #         (lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
    #         transforms.Lambda
    #         (lambda crops: torch.stack([normalize(crop) for crop in crops]))
    #     ])
    #     image = preprocess(image)
    #   return image, self.labels[index]


    def __len__(self):
        return len(self.image_names)



In [None]:
def compute_score_with_logits(logits, labels):
  logits = torch.max(logits, 1)[1].data # argmax
  one_hots = torch.zeros(*labels.size()).cuda()
  one_hots.scatter_(1, logits.view(-1, 1), 1)
  scores = (one_hots * labels)

  return scores

def tile(a, dim, n_tile):
  init_dim = a.size(dim)
  repeat_idx = [1] * a.dim()
  repeat_idx[dim] = n_tile
  a = a.repeat(*(repeat_idx))
  order_index = torch.LongTensor(np.concatenate([init_dim * np.arange(n_tile) + i for i in range(init_dim)]))
  return torch.index_select(a, dim, order_index)

def get_label_for_image(model, image_path):
  classes = ['Atelectasis', 'Consolidation', 'Infiltration', 'Pneumothorax', 'Edema', 'Emphysema', 'Fibrosis',
      'Effusion', 'Pneumonia', 'Pleural_Thickening', 'Cardiomegaly', 'Nodule', 'Hernia', 'Mass', 'No Finding']
  input_image_grey = Image.open(image_path)
  input_image = input_image_grey.convert('RGB')
  preprocess = transforms.Compose([transforms.Resize(350),
                                   transforms.CenterCrop(300),
                                   transforms.RandomHorizontalFlip(p=0.2),
                                   transforms.ToTensor()
                                   #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                                   ])
  input_tensor = preprocess(input_image)
  input_batch = input_tensor.unsqueeze(0)
  output = model(input_batch)
  index_tensor = torch.argmax(output)
  index = index_tensor.item()
  return classes[index]

In [None]:
base_model = [
    # layer, expand_ratio, channels, repeats, stride, kernel_size
    [1, 1, 24, 2, 1, 3],
    [2, 4, 32, 3, 2, 3],
    [3, 4, 48, 3, 2, 3],
    [4, 6, 96, 5, 2, 3],
    [5, 6, 136, 5, 1, 5],
    [6, 6, 232, 6, 2, 5],
    [7, 6, 384, 2, 1, 3],
]

In [None]:
class ConvBnAct(nn.Module):
  def __init__(
      self,in_channels,out_channels,kernel_size=3, stride=1, padding=0, groups=1
  ):
    super(ConvBnAct, self).__init__()
    self.cnn = nn.Conv2d(
        in_channels,
        out_channels,
        kernel_size,
        stride,
        padding,
        groups=groups,
        bias=False,
    )
    self.bn = nn.BatchNorm2d(out_channels)
    self.gelu = nn.GELU()

  def forward(self, x):
    return self.gelu(self.bn(self.cnn(x)))

class SqueezeExcitation(nn.Module):
  def __init__(self, in_channels, reduced_dim):
    super(SqueezeExcitation, self).__init__()
    self.se = nn.Sequential(
        nn.AdaptiveAvgPool2d(1),  #Global Average pooling
        nn.Conv2d(in_channels, reduced_dim, kernel_size=1),
        nn.GELU(),
        nn.Conv2d(reduced_dim, in_channels, kernel_size=1),
        nn.Sigmoid(),
    )

  def forward(self, x):
    return x * self.se(x)

class FusedMBConv(nn.Module):
  def __init__(
      self, in_channels, out_channels, kernel_size, stride, expand_ratio, reduction=4
  ):
    super(FusedMBConv, self).__init__()
    hidden_dim = in_channels * expand_ratio
    reduced_dim = int(in_channels / reduction)
    padding = (kernel_size - 1) // 2

    self.conv = nn.Sequential(
        ConvBnAct(
            in_channels,
            hidden_dim,
            kernel_size,
            stride,
            padding=padding,
            groups=1,
            ),
        SqueezeExcitation(hidden_dim, reduced_dim),
        nn.Conv2d(hidden_dim, out_channels, 1, bias=False),
        nn.BatchNorm2d(out_channels),
        nn.GELU()
        )

  def forward(self, x):
    return self.conv(x)

class MBConvN(nn.Module):
  def __init__(
      self, in_channels, out_channels, kernel_size, stride, expand_ratio, reduction=4
  ):
    super(MBConvN, self).__init__()
    hidden_dim = in_channels * expand_ratio
    self.expand = in_channels != hidden_dim
    reduced_dim = int(in_channels / reduction)
    padding = (kernel_size - 1) // 2

    if self.expand:
      self.expand_conv = ConvBnAct(
          in_channels, hidden_dim, kernel_size=1, stride=1, padding=1
      )
    self.conv = nn.Sequential(
        ConvBnAct(
            hidden_dim,
            hidden_dim,
            kernel_size,
            stride,
            padding,
            groups=hidden_dim,
            ),
        SqueezeExcitation(hidden_dim, reduced_dim),
        nn.Conv2d(hidden_dim, out_channels, 1, bias=False),
        nn.BatchNorm2d(out_channels),
        nn.GELU()
        )

  def forward(self, inputs):
    x = self.expand_conv(inputs) if self.expand else inputs
    return self.conv(x)

#efficientnet-b3 (1.2, 1.4, 300, 0.3)
class EfficientNet(nn.Module):
  def __init__(self,width_factor=1.2,depth_factor=1.4,dropout_rate=0.3,num_classes=15):
    super(EfficientNet, self).__init__()
    self.width_factor = width_factor
    self.dropout_rate = dropout_rate
    last_channels = ceil(1280 * width_factor)
    self.pool = nn.AdaptiveAvgPool2d(1)
    self.features = self.create_features(width_factor, last_channels)
    self.classifier = nn.Sequential(
        nn.Dropout(dropout_rate),
        nn.Linear(last_channels,num_classes),
        nn.Softmax(dim=1)
    )

  def create_features(self,width_factor, last_channels):
    channels = 4*ceil(int(32 * width_factor)/4)
    features = [ConvBnAct(3, channels, 3, stride=2, padding=1)]
    in_channels = channels

    for layer_num, expand_ratio, channels, repeats, stride, kernel_size in base_model:
      out_channels = channels
      layers_repeats = repeats
      for layer in range(layers_repeats):
        if layer_num<=3:
          features.append(
              FusedMBConv(
                  in_channels,
                  out_channels,
                  kernel_size=kernel_size,
                  stride=stride if layer==0 else 1,
                  expand_ratio=expand_ratio
              )
          )
        else:
          features.append(
              MBConvN(
                  in_channels,
                  out_channels,
                  kernel_size=kernel_size,
                  stride=stride if layer==0 else 1,
                  expand_ratio=expand_ratio
              )
          )
        in_channels = out_channels

    features.append(
        ConvBnAct(in_channels, last_channels, 1, stride=1, padding=0)
        )
    return nn.Sequential(*features)

  def forward(self, x):
    x = self.pool(self.features(x))
    return self.classifier(x.view(x.shape[0], -1))

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.5):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        
    def forward(self, inputs, targets):
        BCE_loss = F.binary_cross_entropy_with_logits(inputs, targets)
        pi = torch.exp(-BCE_loss)
        Focal_loss = self.alpha * (1-pi)**self.gamma * BCE_loss
        return torch.mean(Focal_loss)

In [None]:
%%time
data = DataPreprocessing()
train_set, test_set = random_split(data, [math.ceil(len(data) * 0.8), math.floor(len(data) * 0.2)])
train,validate = random_split(train_set, [math.ceil(len(train_set) * 0.9), math.floor(len(train_set) * 0.1)])

trainloader = torch.utils.data.DataLoader(train, batch_size=20, shuffle=True, num_workers=5)
validateloader = torch.utils.data.DataLoader(validate, batch_size=20, shuffle=True, num_workers=5)
testloader = torch.utils.data.DataLoader(test_set, batch_size=20, shuffle=False)

classes = ['Atelectasis', 'Consolidation', 'Infiltration', 'Pneumothorax', 'Edema', 'Emphysema', 'Fibrosis',
      'Effusion', 'Pneumonia', 'Pleural_Thickening', 'Cardiomegaly', 'Nodule', 'Hernia', 'Mass', 'No Finding']

model = EfficientNet().cuda()
#model = nn.DataParallel(model).cuda()

In [None]:
%%time

loss_list = []
epochs_list = []
val_loss_list = []
#criterion = nn.BCELoss()
criterion = FocalLoss()
optimizer = optim.SGD(model.parameters(), lr=0.004, momentum=0.88)
#optim.AdamW(model.parameters(), lr=0.005, weight_decay=0.005)

for epoch in range(6):
  epochs_list.append(epoch+1)
  running_loss = 0.0
  correct = 0
  total = 0
  for i, (images, labels) in enumerate(trainloader, 0): # get the inputs; data is a list of [images, labels]
    optimizer.zero_grad()

    images = images.cuda()

    #format input
    n_batches, channels, height, width = images.size()
    image_batch = torch.autograd.Variable(images.view(-1, channels, height, width)) #640 images: 64 batches contain 10 crops each decomposed into 640 images

    labels = tile(labels, 0, 1).cuda() #duplicate for each crop the label [1,2],[3,4] => [1,2],[1,2],[3,4],[3,4] -> 640 labels

    # forward + backward + optimize
    outputs = model(image_batch)
    loss = criterion(outputs, labels.float())
    loss.backward()
    optimizer.step()

    # print statistics
    running_loss += loss.item()

    correct += compute_score_with_logits(outputs, labels).sum()
    total += labels.size(0)

  print('Epoch: %d, loss: %.3f, Accuracy: %.3f' %
        (epoch + 1, running_loss, 100 * correct / total))
  loss_list.append(running_loss)
  val_correct = 0
  val_total = 0
  val_running_loss = 0
  with torch.no_grad():
    for i, (images, labels) in enumerate(validateloader, 0):
      images = images.cuda()
      n_batches, channels, height, width = images.size()
      image_batch = torch.autograd.Variable(images.view(-1, channels, height, width))
      labels = tile(labels, 0, 1).cuda()
      outputs = model(image_batch)
      val_correct += compute_score_with_logits(outputs, labels).sum()
      val_total += labels.size(0)
      val_loss = criterion(outputs, labels.float())
      val_running_loss += val_loss.item()

  print('Accuracy on validation set: %.3f' % (100 * val_correct / val_total))
  val_loss_list.append(val_running_loss*9)

print('Finished Training')

In [None]:
plt.plot(epochs_list, loss_list, label='Training_Loss')
plt.plot(epochs_list, val_loss_list, label='Validation_Loss')

plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Plot of loss vs epochs')
plt.legend()

# Display the plot
plt.show()

In [None]:
plt.plot(epochs_list, loss_list, marker='.', linestyle='-', color='b')

# Adding labels and title
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Plot of loss vs epochs')

# Adding a legend
plt.legend()

# Show the plot
plt.show()

In [None]:
model.eval()

In [None]:
correct = 0
total = 0
with torch.no_grad():
  for i, (images, labels) in enumerate(testloader, 0):
    images = images.cuda()
    n_batches, channels, height, width = images.size()
    image_batch = torch.autograd.Variable(images.view(-1, channels, height, width))
    labels = tile(labels, 0, 1).cuda()
    outputs = model(image_batch)
    correct += compute_score_with_logits(outputs, labels).sum()
    total += labels.size(0)

print('Accuracy on test set: %.3f' % (100 * correct / total))

In [None]:
gradients = None
activations = None

def backward_hook(module, grad_input, grad_output):
  global gradients
  gradients = grad_output

def forward_hook(module, args, output):
  global activations
  activations = output

backward_hook = model.features[-1].register_full_backward_hook(backward_hook, prepend=False)
forward_hook = model.features[-1].register_forward_hook(forward_hook, prepend=False)

In [None]:
images_to_test_CAM = [0,3,69,420,666]

for i in images_to_test_CAM:
  image, label = data[i]
  image_to_show = image[1]
  image_batch = image.cuda().unsqueeze(0)
  model(image_batch).mean().backward()

  # pool the gradients across the channels
  pooled_gradients = torch.mean(gradients[0], dim=[0, 2, 3])

  # weight the channels by corresponding gradients
  for i in range(activations.size()[1]):
    activations[:, i, :, :] *= pooled_gradients[i]

  # average the channels of the activations
  heatmap = torch.mean(activations, dim=1).squeeze()

  # relu on top of the heatmap
  heatmap = F.relu(heatmap)

  # normalize the heatmap
  heatmap /= torch.max(heatmap)

  heatmap_cpu = heatmap.cpu()

  overlay = to_pil_image(heatmap_cpu.detach(), mode='F').resize((300,300), resample=PIL.Image.BICUBIC)
  cmap = colormaps['jet']
  overlay = (255 * cmap(np.asarray(overlay) ** 2)[:, :, :3]).astype(np.uint8)

  fig = plt.figure(figsize=(12, 6))
  plt.subplot(1, 2, 1)
  plt.imshow(image_to_show, cmap='gray')
  plt.title('Original X-ray')
  plt.axis('off')

  plt.subplot(1, 2, 2)
  plt.imshow(image_to_show, cmap='gray')
  plt.title('CAM overlayed')
  plt.axis('off')

  # Overlaying the third image on the second one
  plt.imshow(overlay, alpha=0.4, cmap='jet', interpolation='nearest')
  plt.title('CAM overlayed')
  plt.axis('off')

  # Show the plot
  plt.show()
  diseases = []
  for i, disease in enumerate(DataPreprocessing.classEncoding.keys()):
      if label[i] == 1:
          diseases.append(disease)

  print(f"Label for the above image:", diseases)

# References:
- https://www.youtube.com/watch?v=fR_0o25kigM&list=PLhhyoLH6IjfxeoooqP9rhU3HJIAVAJ3Vz&index=20
- https://github.com/thibaultwillmann/CheXNet-Pytorch

# Runs published:
- https://www.kaggle.com/code/armeddinosaur/efficientnextv2
- https://www.kaggle.com/armeddinosaur/efficientnextv2-small