In [1]:
!pip install pydicom
!pip install torchcam
!pip install torchinfo
!pip install -U albumentations
!pip install iterative-stratification
!pip install ipywidgets
!pip install SciencePlots

Collecting albumentations
  Downloading albumentations-2.0.8-py3-none-any.whl.metadata (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.1/43.1 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
Collecting albucore==0.0.24 (from albumentations)
  Downloading albucore-0.0.24-py3-none-any.whl.metadata (5.3 kB)
Downloading albumentations-2.0.8-py3-none-any.whl (369 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m369.4/369.4 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading albucore-0.0.24-py3-none-any.whl (15 kB)
Installing collected packages: albucore, albumentations
  Attempting uninstall: albucore
    Found existing installation: albucore 0.0.23
    Uninstalling albucore-0.0.23:
      Successfully uninstalled albucore-0.0.23
  Attempting uninstall: albumentations
    Found existing installation: albumentations 2.0.5
    Uninstalling albumentations-2.0.5:
      Successfully uninstalled albumentations-2.0.5
Successfully installed albuc

In [None]:
import os
import random
import re
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pydicom
import scienceplots
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, f1_score, roc_curve, auc, roc_auc_score
from tqdm import tqdm
from albumentations import Compose, CenterCrop, HorizontalFlip, Normalize, RandomRotate90
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import label_binarize

import torch
import torch.nn as nn
import torchvision
from torch import optim
from torch.utils.data import Dataset
from torchvision import transforms

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

IMG_SIZE = 256
MEAN = 0.5
STD = 1.0
BATCH_SIZE = 64
NUM_CLASSES = 6
LEARNING_RATE = 2e-3
N_EPOCHS = 8
TRAIN_TEST_SPLIT = 0.15
NUM_WORKERS = 4
IMG_DIR = '/kaggle/input/rsna-intracranial-hemorrhage-detection/rsna-intracranial-hemorrhage-detection/stage_2_train'
CSV_PATH = '/kaggle/input/rsna-intracranial-hemorrhage-detection/rsna-intracranial-hemorrhage-detection/stage_2_train.csv'
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.benchmark = True

class DicomDataset(Dataset):
    def __init__(self, img_dir, df, transform=None, labels=True):
        self.img_dir = img_dir
        self.df = df
        self.transform = transform
        self.labels=labels

    def correct_dcm(self, dcm):
        x = dcm.pixel_array + 1000
        x = np.where(x >= 4096, x - 4096, x)
        dcm.PixelData = x.tobytes()
        dcm.RescaleIntercept = -1000

    def window_image(self, dcm, window_center, window_width):
        if (dcm.BitsStored == 12) and (dcm.PixelRepresentation == 0) and (int(dcm.RescaleIntercept) > -100):
            self.correct_dcm(dcm)
        img = dcm.pixel_array * dcm.RescaleSlope + dcm.RescaleIntercept
        img_min = window_center - window_width // 2
        img_max = window_center + window_width // 2
        img = np.clip(img, img_min, img_max)
        return img

    def bsb_window(self, dcm):
        windows = [
            (40, 80, 0, 80),
            (80, 200, -20, 200),
            (40, 380, -150, 380)]
        images = [(self.window_image(dcm, center, width) - low) / denom
                  for (center, width, low, denom) in windows]
        bsb_img = np.stack(images, axis=-1)
        return bsb_img

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        img_path = os.path.join(self.img_dir, self.df.iloc[idx, 0]+'.dcm')
        data = pydicom.dcmread(img_path)
        img = self.bsb_window(data)
        if self.transform:
            augmented = self.transform(image=img)
            img = augmented['image']
        if self.labels:
            label = torch.tensor(self.df.iloc[idx, 1:],dtype=torch.float32)
            return {'image': img, 'labels': label}
        return {'image': img}


def preprocess(csv_path):
    df= pd.read_csv(csv_path)
    df[['ID','Subtype']]= df['ID'].str.rsplit(pat='_',n=1,expand=True)
    df= df.pivot_table(index='ID',columns='Subtype',values='Label').reset_index()
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df['any']= df['any'].apply(lambda x : 0.0 if x==1.0 else 1.0)
    return sample(df)


def sample(df):
    subtypes = ['epidural', 'intraparenchymal', 'intraventricular', 'subarachnoid', 'subdural']
    not_any = df[df['any'] == 1.0]
    subtype_samples = [df[df[subtype] == 1.0] for subtype in subtypes]
    lim = min(len(sub) for sub in subtype_samples)
    balanced = pd.concat([sub.sample(lim) for sub in subtype_samples], axis=0)
    noise = not_any.sample(lim * 4)
    final_df = pd.concat([balanced, noise], axis=0)
    return final_df.sample(frac=1).reset_index(drop=True)

def train_one_epoch(model, data_loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    y_true = []
    y_pred = []
    loop = tqdm(data_loader, desc="Training", leave=False)
    for batch in loop:
        inputs = batch["image"].to(device, dtype=torch.float32)
        labels = batch["labels"].to(device, dtype=torch.float32)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        y_true.extend(labels.detach().cpu().numpy())
        y_pred.extend(outputs.detach().cpu().numpy())
    avg_loss = running_loss / len(data_loader)
    return avg_loss, y_true, y_pred

def calculate_metrics(y_true, y_pred):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    y_pred_probs = 1 / (1 + np.exp(-y_pred))

    y_true_labels = np.argmax(y_true, axis=1)
    y_pred_labels = np.argmax(y_pred_probs, axis=1)

    accuracy = accuracy_score(y_true_labels, y_pred_labels)
    precision = precision_score(y_true_labels, y_pred_labels, average='weighted')
    f1 = f1_score(y_true_labels, y_pred_labels, average='weighted')
    auc = roc_auc_score(y_true, y_pred_probs, average='macro', multi_class='ovr')

    return accuracy, precision, f1, auc

def train_model(model, data_loader_train, optimizer, criterion, model_name, device, n_epochs):
    history = {'accuracy': [], 'precision': [], 'loss': [], 'f1': [], 'auc':[]}
    for epoch in range(1, n_epochs + 1):
        print(f"\nEpoch {epoch}/{n_epochs}")
        print("-" * 30)
        avg_loss, y_true, y_pred = train_one_epoch(model, data_loader_train, optimizer, criterion, device)
        accuracy, precision, f1,auc = calculate_metrics(y_true, y_pred)
        print(f"Loss: {avg_loss:.4f} | Accuracy: {accuracy:.4f} | Precision: {precision:.4f} | F1: {f1:.4f} | AUC: {auc:.4f}")
        history['loss'].append(avg_loss)
        history['accuracy'].append(accuracy)
        history['precision'].append(precision)
        history['f1'].append(f1)
        history['auc'].append(auc)
        class_names = ['epidural', 'intraparenchymal', 'intraventricular', 'subarachnoid', 'subdural', 'any']
        plot_roc_curve(np.array(y_true), np.array(y_pred), class_names, epoch, model_name)

    return history

def plot_roc_curve(y_true, y_pred, class_names, epoch, model_name):
    plt.figure(figsize=(15, 10))
    n_classes = y_true.shape[1]
    y_pred_probs = 1 / (1 + np.exp(-y_pred))

    for i in range(n_classes):
        fpr, tpr, _ = roc_curve(y_true[:, i], y_pred_probs[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f'{class_names[i]} (AUC = {roc_auc:.2f})')

    plt.plot([0, 1], [0, 1], 'k--', linewidth=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.style.use(['no-latex'])
    plt.title(f'ROC curve for {model_name}', fontsize=18)
    plt.xlabel('False Positive Rate', fontsize=16)
    plt.ylabel('True Positive Rate', fontsize=16)
    plt.legend(loc="lower right", fontsize=14)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.show()

    output_dir = "/Users/matilda/Desktop/"
    os.makedirs(output_dir, exist_ok=True)
    plt.savefig(f"{output_dir}/roc_curve_{model_name.lower()}_epoch_{epoch}.png", dpi=300, bbox_inches='tight')
    plt.close()


transform_train = Compose([CenterCrop(IMG_SIZE,IMG_SIZE),
                           Normalize(mean=MEAN, std=STD),
                           A.OneOf([RandomRotate90(p=0.3),
                                HorizontalFlip(p=0.3)],p=0.75),ToTensorV2()])
transform_test= Compose([CenterCrop(IMG_SIZE,IMG_SIZE),
                         Normalize(mean=MEAN,std=STD),
                         ToTensorV2()])

REDUCTION_FACTOR = 0.05
df = preprocess(CSV_PATH)
X,Y= train_test_split(df, test_size=TRAIN_TEST_SPLIT, shuffle=True)

train_dataset= DicomDataset(IMG_DIR, X, transform=transform_train, labels=True)
test_dataset= DicomDataset(IMG_DIR, Y, transform=transform_test, labels=True)

data_loader_train = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
data_loader_test = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

In [None]:
from torchvision.models import googlenet

model = googlenet(pretrained=False, aux_logits=False)
model.fc = torch.nn.Linear(model.fc.in_features, NUM_CLASSES)
model_name = "GoogLeNet"
model.to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = torch.nn.BCEWithLogitsLoss()
history = train_model(model, data_loader_train, optimizer, criterion, model_name, device=DEVICE, n_epochs=N_EPOCHS)

In [None]:
from torchvision.models import shufflenet_v2_x1_0

model = shufflenet_v2_x1_0(pretrained=False)
model.fc = torch.nn.Linear(model.fc.in_features, NUM_CLASSES)
model_name = "ShuffleNet"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = torch.nn.BCEWithLogitsLoss()
history = train_model(model, data_loader_train, optimizer, criterion,  model_name, device=DEVICE, n_epochs=N_EPOCHS)

In [None]:
from torchvision.models import resnet50

model = resnet50(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, NUM_CLASSES)
model_name = "ResNet50"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = torch.nn.BCEWithLogitsLoss()
history = train_model(model, data_loader_train, optimizer, criterion, model_name, device=DEVICE, n_epochs=N_EPOCHS)

In [None]:
from torchinfo import summary

model = googlenet(pretrained=False)
model.fc = torch.nn.Linear(model.fc.in_features, NUM_CLASSES)

summary(model, input_size=(BATCH_SIZE, 3, IMG_SIZE, IMG_SIZE), device=str(device))

model = shufflenet_v2_x1_0(pretrained=False)
model.fc = torch.nn.Linear(model.fc.in_features, NUM_CLASSES)

summary(model, input_size=(BATCH_SIZE, 3, IMG_SIZE, IMG_SIZE), device=str(device))

model = resnet50(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, NUM_CLASSES)

summary(model, input_size=(BATCH_SIZE, 3, IMG_SIZE, IMG_SIZE), device=str(device))

--------------------

In [None]:
import matplotlib.pyplot as plt
import scienceplots
plt.style.use(['no-latex'])

results = {
    "GoogLeNet": {
        "loss": [0.4676, 0.4254, 0.4028, 0.3878, 0.3713, 0.3606, 0.3546, 0.3457],
        "accuracy": [0.4247, 0.4806, 0.5188, 0.5320, 0.5424, 0.5577, 0.5637, 0.5732],
        "precision": [0.3499, 0.4197, 0.4668, 0.5037, 0.5298, 0.5564, 0.5577, 0.5705],
        "f1": [0.3679, 0.4190, 0.4603, 0.4841, 0.5067, 0.5284, 0.5369, 0.5487],
        "auc": [0.6667, 0.7517, 0.7835, 0.8060, 0.8288, 0.8406, 0.8476, 0.8571]
    },
    "ShuffleNet": {
        "loss": [0.4106, 0.3695, 0.3482, 0.3322, 0.3187, 0.3129, 0.3034, 0.2969],
        "accuracy": [0.5176, 0.5541, 0.5753, 0.5894, 0.6052, 0.6107, 0.6222, 0.6335],
        "precision": [0.4613, 0.5300, 0.5689, 0.5918, 0.6102, 0.6170, 0.6313, 0.6440],
        "f1": [0.4612, 0.5149, 0.5439, 0.5645, 0.5850, 0.5918, 0.6056, 0.6190],
        "auc": [0.7751, 0.8267, 0.8513, 0.8659, 0.8782, 0.8832, 0.8913, 0.8960]
    },
    "ResNet50": {
        "loss": [0.4699, 0.4524, 0.4144, 0.3923, 0.3742, 0.3635, 0.3548, 0.3438],
        "accuracy": [0.4230, 0.4363, 0.5085, 0.5364, 0.5498, 0.5594, 0.5671, 0.5797],
        "precision": [0.3612, 0.3812, 0.4354, 0.4786, 0.5208, 0.5497, 0.5594, 0.5752],
        "f1": [0.3770, 0.3919, 0.4423, 0.4806, 0.5075, 0.5253, 0.5347, 0.5514],
        "auc": [0.6731, 0.7031, 0.7649, 0.7962, 0.8202, 0.8345, 0.8453, 0.8559]
    }
}

def plot_metric(results, metric_name):
    plt.figure(figsize=(8, 6))
    for model_name, metrics in results.items():
        plt.plot(metrics[metric_name], label=model_name, linewidth=2, marker='o')

    plt.title(f"{metric_name.capitalize()} over epochs", fontsize=14)
    plt.xlabel("Epoch", fontsize=14)
    plt.ylabel(metric_name.capitalize(), fontsize=14)
    plt.xticks(ticks=range(8), labels=[f"{i+1}" for i in range(8)])
    plt.legend()
    plt.grid(True, linestyle="--", alpha=0.5)
    plt.tight_layout()
    plt.show()

for metric in ["loss", "accuracy", "precision", "f1", "auc"]:
    plot_metric(results, metric)