# Identifying pneumonia in X-ray images
Maciej Lorens

In [None]:
import os
from pathlib import Path
import glob
from tqdm import tqdm
from joblib import dump, load
import warnings
import numpy as np
import pandas as pd

# PyTorch
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchmetrics
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torchvision import  models, transforms
import lightning.pytorch as pl
from lightning.pytorch.callbacks.early_stopping import EarlyStopping

# Sklearn
from sklearn.svm import LinearSVC, SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    log_loss, 
    accuracy_score,
    average_precision_score,
    balanced_accuracy_score,
    roc_auc_score,
    roc_curve,
    f1_score,
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay
)
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

# Hyperparameter optimization
import optuna as opt
from optuna.integration import PyTorchLightningPruningCallback

# Image processing
from PIL import Image
from skimage.io import imread
import cv2

# Visualizations
import matplotlib.pyplot as plt
import seaborn as sn
sn.set_theme(style="darkgrid")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Plot the loss curve based on the provided folder or the latest folder from lightning logs
def plot_loss_curve(fpath=None):
    if fpath == None:
        latest_file = os.path.join(max([os.path.join("/home/maciej/Uni/Studia II stopień/II rok/Machine Learning 2/GRU-ML-presentation/lightning_logs", f) 
                                        for f in os.listdir("lightning_logs")], 
                                        key=os.path.getctime), "metrics.csv")
    else:
        latest_file = os.path.join(fpath, "metrics.csv")
    loss_data = pd.read_csv(latest_file)
    loss_data['val_loss'] = loss_data.val_loss.shift(1)
    loss_data = loss_data[["epoch", "train_loss", "val_loss"]].dropna()
    loss_data['epoch'] = loss_data.epoch

    plt.figure(figsize=(10,5))
    plt.plot(loss_data['epoch'], loss_data['train_loss'], label="Training loss")
    plt.plot(loss_data['epoch'], loss_data['val_loss'], label = "Validation loss")
    plt.title("Loss curve", fontsize=18)
    plt.xlabel("Epoch", fontsize=16)
    plt.ylabel("Loss metric", fontsize=16)
    plt.xticks(loss_data['epoch'])
    plt.legend(loc="upper right", fontsize=16)
    plt.tight_layout()
    plt.show()

# Data exploration

In [None]:
# Path to the images of X-rays
data_dir = Path("chest_xray")

# Path to the train directory
train_dir = data_dir / "train"

# Path to test directory
test_dir = data_dir / "test"

In [None]:
# Get a dataframe of image paths and labels
def get_image_df(img_dir):
    # Paths to normal and pneumonia xrays
    normal_dir = img_dir / "NORMAL"
    pneumonia_dir = img_dir / "PNEUMONIA"

    # List of all the images
    normal_cases = normal_dir.glob("*.jpeg")
    pneumonia_cases = pneumonia_dir.glob("*.jpeg")

    data = []

    for img in normal_cases:
        data.append((img, 0))

    for img in pneumonia_cases:
        data.append((img, 1))

    data = pd.DataFrame(data, columns=["Image_dir", "Label"])

    return data

In [None]:
train_data = get_image_df(train_dir)
test_data = get_image_df(test_dir)

In [None]:
train_data.head()

In [None]:
# Get the counts for each class
cases_count = train_data['Label'].value_counts()

# Plot the results 
plt.figure(figsize=(10, 8))
sn.barplot(x=cases_count.index, y=cases_count.values)
plt.title('Number of cases', fontsize=14)
plt.xlabel('Case type', fontsize=12)
plt.ylabel('Count', fontsize=12)
plt.xticks(range(len(cases_count.index)), ['Normal', 'Pneumonia'])
plt.show()

In [None]:
# Get few samples for both the classes
pneumonia_samples = (train_data[train_data['Label']==1]['Image_dir'].iloc[:3]).tolist()
normal_samples = (train_data[train_data['Label']==0]['Image_dir'].iloc[:3]).tolist()

# Concat the data in a single list
samples = pneumonia_samples + normal_samples

# Plot the data 
f, ax = plt.subplots(2, 3, figsize=(20, 10))
for i in range(6):
    img = imread(samples[i])
    ax[i//3, i%3].imshow(img, cmap='gray')
    if i<3:
        ax[i//3, i%3].set_title("Normal")
    else:
        ax[i//3, i%3].set_title("Pneumonia")
    ax[i//3, i%3].axis('off')
    ax[i//3, i%3].set_aspect('auto')
plt.show()

In [None]:
# Splitting train data into train and validation set
train_data, val_data = train_test_split(train_data, test_size=0.3, shuffle=True, stratify=train_data["Label"], random_state=42)
train_data, val_data = train_data.reset_index(drop=True), val_data.reset_index(drop=True)

In [None]:
print(f"Pneumonia cases in the trainining set: {train_data['Label'].value_counts()[1]}, Normal cases in the trainining set: {train_data['Label'].value_counts()[0]}")
print(f"Pneumonia cases in the validation set: {val_data['Label'].value_counts()[1]}, Normal cases in the validation set: {val_data['Label'].value_counts()[0]}")

# Modeling

## Dataset and Dataloader

In [None]:
# Custom PyTorch dataset of images
class ImageDataset(Dataset):
    def __init__(self, data, transform, device=device):
        self.data = data
        self.transform = transform
        self.device = device
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        image_dir = self.data.Image_dir[index]
        image = Image.open(image_dir)
        label = self.data.Label[index]
        image = self.transform(image)
        # If image is greyscale then dstack
        if image.shape[0] == 1:
            image = torch.cat((image, image, image), dim=0)

        return (image, torch.tensor(label).to(torch.float32).to(self.device))

In [None]:
# PyTorch Lightning dataloader
class PneumoniaDataLoader(pl.LightningDataModule):
    def __init__(self, train_data, val_data, test_data, random_state, batch_size, num_workers, device):
        super().__init__()
        self.train_data = train_data
        self.val_data = val_data
        self.test_data = test_data
        self.random_sate = random_state
        self.batch_size = batch_size
        self.num_workers = num_workers

    def setup(self, stage):
        # Data transformations for training
        train_transforms = transforms.Compose([
            transforms.Resize((150,150)),
            transforms.RandomRotation(30),
            transforms.RandomHorizontalFlip(p=0.4),
            transforms.ToTensor()
        ])

        # Data transformations for validation and testing
        test_transforms = transforms.Compose([
            transforms.Resize((150,150)),
            transforms.ToTensor()
        ])

        # Create three instances of the custom dataset class
        self.image_train = ImageDataset(self.train_data, transform=train_transforms)
        self.image_val = ImageDataset(self.val_data, transform=test_transforms)
        self.image_test = ImageDataset(self.test_data, transform=test_transforms)

    def train_dataloader(self):
        # Return the dataloader of train data
        return DataLoader(self.image_train
                          , shuffle=True
                          , batch_size=self.batch_size
                          , drop_last=False
                          , num_workers=self.num_workers
                          , persistent_workers=True
                          )

    def val_dataloader(self):
        # Return the dataloader of validation data
        return DataLoader(self.image_val
                          , shuffle=False
                          , batch_size=self.batch_size
                          , drop_last=False
                          , num_workers=self.num_workers
                          , persistent_workers=True
                          )

    def test_dataloader(self):
        # Return the dataloader of test data
        return DataLoader(self.image_test
                          , shuffle=False
                          , batch_size=self.batch_size
                          , drop_last=False
                          , num_workers=self.num_workers
                          , persistent_workers=True
                          )

## PyTorch Models

In [None]:
# Benchmark simple CNN model class
class BenchmarkCNNModel(nn.Module):
    def __init__(self, dropout):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(18496, 120) # 16 * 34 * 34
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 1)
        self.pool = nn.MaxPool2d(2, 2)
        self.softmax = nn.Sigmoid()
        self.dropout = nn.Dropout(p=0.2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(self.dropout(F.relu(self.conv2(x))))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.softmax(self.fc3(x))
        return x
    

# Transfer learning model with VGG16 for the stacking ensemble
class VGG16Model(nn.Module):
    def __init__(self, dropout):
        super().__init__()
        self.vgg16 = models.vgg16_bn(weights=torchvision.models.VGG16_BN_Weights.DEFAULT)
        num_features = self.vgg16.classifier[-1].in_features
        features = list(self.vgg16.classifier.children())[:-1]
        self.fn = nn.Linear(num_features, 1)
        self.vgg16.classifier = nn.Sequential(*features)
        self.dropout = nn.Dropout(p=dropout)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.vgg16(x)
        x = self.fn(self.dropout(x))

        return self.sigmoid(x)


# Transfer learning model with DenseNet169 for the stacking ensemble
class DenseNetModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.densenet = models.densenet169(weights=torchvision.models.DenseNet169_Weights.DEFAULT)
        self.densenet.classifier = nn.Sequential(nn.Identity()) # or nn.Dropout(p=0.2), nn.Linear(num_features, 1)
    
    def forward(self, x):
        x = self.densenet(x)

        return x
    

# Meta Model for combining outputs of two other models
class MetaModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fn1 = nn.Linear(2, 4)
        self.fn2 = nn.Linear(4, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.relu(self.fn1(x))
        x = self.relu(self.fn2(x))


        return self.sigmoid(x)

In [None]:
# Pneumonia classification model created with lightning
class PneumoniaModel(pl.LightningModule):
    def __init__(self, model, lr=0.001):
        super().__init__()
        # Model of choice
        self.model = model
        # Learning rate
        self.lr = lr
        # Metrics
        self.bce = nn.BCELoss()
        self.acc = torchmetrics.Accuracy(task="binary")
        self.auroc = torchmetrics.AUROC(task="binary")


    def training_step(self, batch, batch_idx):
        X, y = batch
        y_pred = self.model(X)
        train_loss = self.bce(y_pred.squeeze(), y)
        self.log("train_loss", train_loss, logger=True, prog_bar=True, on_step=False, on_epoch=True)
        return train_loss
    
    def validation_step(self, batch, batch_idx):
        X, y = batch
        y_pred = self.model(X)
        val_loss = self.bce(y_pred.squeeze(), y)
        self.log("val_loss", val_loss, logger=True, prog_bar=True, on_step=False, on_epoch=True)

    def test_step(self, batch, batch_idx):
        X, y = batch
        y_pred = self.model(X)
        test_bce = self.bce(y_pred.squeeze(), y)
        test_acc = self.acc(y_pred.squeeze(), y.to(torch.int64))
        test_roc = self.auroc(y_pred.squeeze(), y.to(torch.int64))
        self.log_dict({"BCE": test_bce, "Accuracy": test_acc, "ROC": test_roc}, logger=True, prog_bar=True, on_step=False, on_epoch=True)
        # self.log("test_loss", test_loss, prog_bar=True, on_step=False, on_epoch=True, sync_dist=True)
    
    def predict_step(self, batch, batch_id):
        X, y = batch
        y_pred = self.model(X)

        return y_pred

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        return optimizer

## Model training and validation

In [None]:
# Initialize dataloader
pneumonia_loader = PneumoniaDataLoader(train_data=train_data
                                       , val_data=val_data
                                       , test_data=test_data
                                       , random_state=42
                                       , batch_size=128
                                       , num_workers=8
                                       , device=device
                                       )

### Benchmark model

#### Hyperparameter optimization

In [None]:
pl.seed_everything(42, workers=True)
warnings.filterwarnings("ignore", category=UserWarning, module="optuna")

def objective(trial):
    batch_size = trial.suggest_categorical("batch_size", [64, 128, 256])
    dropout = trial.suggest_categorical("dropout", [0.2, 0.4])
    lr = trial.suggest_categorical("lr", [1e-3, 1e-2])

    benchmark_model = BenchmarkCNNModel(dropout=dropout).to(device)
    model = PneumoniaModel(benchmark_model, lr=lr)

    optuna_loader = PneumoniaDataLoader(train_data=train_data
                                        , val_data=val_data
                                        , test_data=test_data
                                        , random_state=42
                                        , batch_size=batch_size
                                        , num_workers=0
                                        , device=device
                                        )
    
    optuna_trainer = pl.Trainer(max_epochs=10
                                , enable_checkpointing=False
                                , logger=True
                                , log_every_n_steps=1
                                , check_val_every_n_epoch=1
                                # , limit_train_batches=0.2
                                # , limit_val_batches=0.2
                                , deterministic=True
                                , default_root_dir="benchmark_optuna_logs/"
                                , callbacks=[PyTorchLightningPruningCallback(trial, monitor="val_loss"),
                                             EarlyStopping(monitor="val_loss", mode="min")])
    
    hyperparameters = dict(batch_size=batch_size, dropout=dropout, lr=lr)
    optuna_trainer.logger.log_hyperparams(hyperparameters)
    optuna_trainer.fit(model, datamodule=optuna_loader)

    return optuna_trainer.callback_metrics["val_loss"].item()

study = opt.create_study(direction="minimize", study_name="benchmark_model", pruner=opt.pruners.MedianPruner(n_warmup_steps=5), load_if_exists=True)

study.optimize(objective, n_trials=12, n_jobs=1)

print("Best hyperparameters: ", study.best_params)
print("Score on validation: ", study.best_value)

#### Training best model

In [None]:
pl.seed_everything(42, workers=True)
benchmark_model = PneumoniaModel(BenchmarkCNNModel(dropout=0.2).to(device), lr=0.01)
benchmark_trainer = pl.Trainer(max_epochs=20
                               , check_val_every_n_epoch=1
                               , log_every_n_steps=1
                               , deterministic=True
                               , default_root_dir="benchmark_logs/"
                               , callbacks=[EarlyStopping(monitor="val_loss", mode="min")])

In [None]:
benchmark_trainer.fit(benchmark_model, datamodule=pneumonia_loader)

In [None]:
benchmark_trainer.test(benchmark_model, dataloaders=pneumonia_loader.test_dataloader())

In [None]:
# Get predictions from the benchmark model
benchmark_preds = benchmark_trainer.predict(benchmark_model, dataloaders=pneumonia_loader.test_dataloader())

### VGG16 Model

#### Hyperparameter tuning

In [None]:
pl.seed_everything(42, workers=True)
warnings.filterwarnings("ignore", category=UserWarning, module="optuna")

def objective(trial):
    batch_size = trial.suggest_categorical("batch_size", [64, 128, 256])
    dropout = trial.suggest_categorical("dropout", [0.2, 0.4])
    lr = trial.suggest_categorical("lr", [1e-3, 1e-2])

    vgg16_model = VGG16Model(dropout=dropout).to(device)
    vgg16_model.vgg16.requires_grad_(False)
    model = PneumoniaModel(vgg16_model, lr=lr)

    optuna_loader = PneumoniaDataLoader(train_data=train_data
                                        , val_data=val_data
                                        , test_data=test_data
                                        , random_state=42
                                        , batch_size=batch_size
                                        , num_workers=0
                                        , device=device
                                        )
    
    optuna_trainer = pl.Trainer(max_epochs=10
                                , enable_checkpointing=False
                                , logger=True
                                , log_every_n_steps=1
                                , check_val_every_n_epoch=1
                                # , limit_train_batches=0.2
                                # , limit_val_batches=0.2
                                , deterministic=True
                                , default_root_dir="vgg16_optuna_logs/"
                                , callbacks=[PyTorchLightningPruningCallback(trial, monitor="val_loss"),
                                             EarlyStopping(monitor="val_loss", mode="min")])
    
    hyperparameters = dict(batch_size=batch_size, dropout=dropout, lr=lr)
    optuna_trainer.logger.log_hyperparams(hyperparameters)
    optuna_trainer.fit(model, datamodule=optuna_loader)

    return optuna_trainer.callback_metrics["val_loss"].item()

study = opt.create_study(direction="minimize", study_name="vgg16_model", pruner=opt.pruners.MedianPruner(n_warmup_steps=5), load_if_exists=True)

study.optimize(objective, n_trials=12, n_jobs=1)

print("Best hyperparameters: ", study.best_params)
print("Score on validation: ", study.best_value)

#### Training the best model

In [None]:
pl.seed_everything(42, workers=True)
vgg16_model = VGG16Model(dropout=0.2).to(device)
vgg16_model.vgg16.requires_grad_(False)

vgg16_model_lightning = PneumoniaModel(vgg16_model, lr=0.02)
vgg16_trainer = pl.Trainer(max_epochs=2
                           , check_val_every_n_epoch=1
                           , log_every_n_steps=2
                           , deterministic=True
                           , default_root_dir="vgg16_logs/"
                           , callbacks=[EarlyStopping(monitor="val_loss", mode="min")])

In [None]:
vgg16_trainer.fit(vgg16_model_lightning, datamodule=pneumonia_loader)

In [None]:
vgg16_trainer.test(vgg16_model_lightning, dataloaders=pneumonia_loader.test_dataloader())

In [None]:
# Get predictions from VGG16
benchmark_preds = benchmark_trainer.predict(benchmark_model, dataloaders=pneumonia_loader.test_dataloader())

### Densenet169 and SVM Classifier

#### Special dataloaders for Densenet169 with SVM Classifier

In [None]:
train_transforms = transforms.Compose([
    transforms.Resize((150,150)),
    transforms.RandomRotation(30),
    transforms.RandomHorizontalFlip(p=0.4),
    transforms.ToTensor()
])

test_transforms = transforms.Compose([
    transforms.Resize((150,150)),
    transforms.ToTensor()
])

# Create three instances of the custom dataset class
image_train = ImageDataset(train_data, transform=train_transforms)
image_val = ImageDataset(val_data, transform=train_transforms)
image_test = ImageDataset(test_data, transform=test_transforms)

train_loader = DataLoader(image_train
                          , shuffle=False 
                          # Batch size doesn't matter here, since we're not training the CNN
                          , batch_size=128
                          , drop_last=False
                          , num_workers=8
                          )

val_loader = DataLoader(image_val
                        , shuffle=False
                        # Batch size doesn't matter here, since we're not training the CNN
                        , batch_size=128
                        , drop_last=False
                        , num_workers=8
                        )

test_loader = DataLoader(image_test
                         , shuffle=False
                         # Batch size doesn't matter here, since we're not training the CNN
                         , batch_size=128
                         , drop_last=False
                         , num_workers=8
                         )

#### Initialize DenseNet169 with pretrained weights

In [None]:
pl.seed_everything(42, workers=True)
densenet_model = DenseNetModel().to(device)
densenet_model.densenet.requires_grad_(False)

In [None]:
def get_densenet_predictions(dataloader, model, device):
    model.eval()
    cnn_preds_all = []

    for X, _ in tqdm(dataloader, total=len(dataloader)):
        cnn_preds = model(X)
        cnn_preds_all += [cnn_preds.detach().cpu().numpy()]
    
    cnn_preds_all = np.concatenate(cnn_preds_all, axis=0)
    
    return cnn_preds_all

In [None]:
train_features = get_densenet_predictions(train_loader, densenet_model, device)
val_features = get_densenet_predictions(val_loader, densenet_model, device)
test_features = get_densenet_predictions(test_loader, densenet_model, device)

#### SVM Classifier

##### Hyperparameter tuning

In [None]:
# Tuning LinearSVC hyperparameters with Optuna
def objective(trial):
    
    params = dict(
        C = trial.suggest_float("C", 1e-2, 1e-1),
        kernel = trial.suggest_categorical("kernel", ["linear", "rbf"]),
        class_weight = trial.suggest_categorical("class_weight", ["balanced", None])
    )

    svc_model = SVC(**params, probability=True, random_state=42)

    svc_pipe = Pipeline([("scaler", StandardScaler())
                     , ("model", svc_model)])

    svc_pipe.fit(train_features, np.array(train_data["Label"]))

    score = log_loss(np.array(val_data["Label"]), svc_pipe.predict(val_features))

    return score

study = opt.create_study(direction="minimize", study_name="svc", pruner=opt.pruners.HyperbandPruner())

study.optimize(objective, n_trials=100, n_jobs=-1)

print("Best hyperparameters: ", study.best_params)
print("Score on validation: ", study.best_value)

##### Training SVM

In [None]:
svc_model = SVC(**study.best_params, probability=True, random_state=42)

svc_pipe = Pipeline([("scaler", StandardScaler())
                    , ("model", svc_model)])

In [None]:
svc_pipe.fit(train_features, np.array(train_data["Label"]))

In [None]:
svc_train_preds = svc_pipe.predict(train_features)
svc_val_preds = svc_pipe.predict(val_features)
svc_test_preds = svc_pipe.predict(test_features)

In [None]:
print(f"Training set ROC-AUC score: {roc_auc_score(svc_train_preds, np.array(train_data['Label']))}")
print(f"Training set balanced accuracy: {balanced_accuracy_score(svc_train_preds, np.array(train_data['Label']))}")
print(f"Training set f1-score: {f1_score(svc_train_preds, np.array(train_data['Label']))}")
print("-" * 50)
print(f"Validation set ROC-AUC score: {roc_auc_score(svc_val_preds, np.array(val_data['Label']))}")
print(f"Validation set balanced accuracy: {balanced_accuracy_score(svc_val_preds, np.array(val_data['Label']))}")
print(f"Validation set f1-score: {f1_score(svc_val_preds, np.array(val_data['Label']))}")

In [None]:
# Confusion matrix of SVC predictions on the validation set
conf_matrix = confusion_matrix(np.array(val_data['Label']), svc_val_preds, labels=[0, 1])
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=["Normal", "Pneumonia"])
disp.plot()
plt.grid(False)
plt.show()

In [None]:
# Save the best model pipeline
dump(svc_pipe, 'svc_pipeline.joblib') 

### Stacking of the two models

In [None]:
# Load best benchmark CNN model
best_benchmark = PneumoniaModel.load_from_checkpoint(checkpoint_path="lightning_logs/version_10/checkpoints/epoch=1-step=58.ckpt", model=BenchmarkCNNModel(dropout=0.2), lr=0.02)

# Load best vgg16 model
best_vgg16 = PneumoniaModel.load_from_checkpoint(checkpoint_path="lightning_logs/version_10/checkpoints/epoch=1-step=58.ckpt", model=VGG16Model(dropout=0.2), lr=0.02)

# Load SVM pipeline
svc_pipe = load("svc_pipeline.joblib")

#### Preparing the input data for the meta-learner

In [None]:
# Get predictions of Densenet+SVM and VGG16 as input for the meta-learner
def get_meta_data(image_df):
    image_transforms = transforms.Compose([
        transforms.Resize((150,150)),
        transforms.ToTensor()
    ])
    meta_loader = DataLoader(ImageDataset(image_df, transform=image_transforms)
                             , shuffle=False
                             # Batch size doesn't matter here, since we're not training the CNN
                             , batch_size=32
                             , drop_last=False
                             , num_workers=8
                             )
    
    vgg16_preds = vgg16_trainer.predict(best_vgg16, dataloaders=meta_loader)
    vgg16_preds = np.hstack([pred.squeeze(1).detach().cpu().numpy() for pred in vgg16_preds])
    
    svm_features = get_densenet_predictions(meta_loader, densenet_model, device)

    svm_preds = svc_pipe.predict_proba(svm_features)[:, 1]

    return np.concatenate((vgg16_preds.reshape(-1, 1), svm_preds.reshape(-1, 1)), axis=1), np.array(image_df["Label"])

In [None]:
X_train, y_train = get_meta_data(train_data)
X_val, y_val = get_meta_data(val_data)
X_test, y_test = get_meta_data(test_data)

In [None]:
train_dataset = TensorDataset(torch.from_numpy(X_train).unsqueeze(1).to(torch.float32).to(device), torch.from_numpy(y_train).to(torch.float32).to(device))
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=128)

val_dataset = TensorDataset(torch.from_numpy(X_val).unsqueeze(1).to(torch.float32).to(device), torch.from_numpy(y_val).to(torch.float32).to(device))
val_loader = DataLoader(val_dataset, shuffle=False, batch_size=128)

test_dataset = TensorDataset(torch.from_numpy(X_test).unsqueeze(1).to(torch.float32).to(device), torch.from_numpy(y_test).to(torch.float32).to(device))
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=128)

#### Training the meta-learner

In [None]:
pl.seed_everything(42, workers=True)
meta_model = MetaModel().to(device)

meta_model_lightning = PneumoniaModel(meta_model, lr=0.05)
meta_trainer = pl.Trainer(max_epochs=20, check_val_every_n_epoch=1, log_every_n_steps=2, deterministic=True)

In [None]:
meta_trainer.fit(meta_model_lightning, train_dataloaders=train_loader, val_dataloaders=val_loader)

In [None]:
meta_trainer.test(meta_model_lightning, dataloaders=test_loader)

In [None]:
vgg16_preds = vgg16_trainer.predict(best_vgg16, dataloaders=pneumonia_loader.test_dataloader())

In [None]:
vgg16_preds = np.hstack([pred.squeeze(1).detach().cpu().numpy() for pred in vgg16_preds])

In [None]:
meta_preds = meta_trainer.predict(meta_model_lightning, dataloaders=test_loader)
meta_preds = np.hstack([pred.squeeze(1, 2).detach().cpu().numpy() for pred in meta_preds])

## Results

### ROC CURVE

In [None]:
# Calculate ROC curve
fpr_vgg16, tpr_vgg16, _ = roc_curve(y_test, vgg16_preds)
fpr_svc, tpr_svc, _ = roc_curve(y_test, svc_test_preds)
fpr_meta, tpr_meta, thresholds = roc_curve(y_test, meta_preds)

# Calculate ROC-AUC score
roc_vgg16 = roc_auc_score(y_test, vgg16_preds)
roc_svc = roc_auc_score(y_test, svc_test_preds)
roc_meta = roc_auc_score(y_test, meta_preds)

# Visualize ROC curve
plt.figure(figsize=(8, 8))
plt.plot(fpr_vgg16, tpr_vgg16, color='red', lw=2, label=f'VGG16 ROC (AUC = {roc_vgg16:.2f})')
plt.plot(fpr_svc, tpr_svc, color='orange', lw=2, label=f'SVM ROC (AUC = {roc_svc:.2f})')
plt.plot(fpr_meta, tpr_meta, color='blue', lw=2, label=f'Stacking ROC (AUC = {roc_meta:.2f})')

plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - ANN, TCN, LSTM')
plt.legend(loc='lower right')
plt.show()

### SHAP values