In [1]:
import os
import glob
import shutil
import copy

import numpy as np
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import albumentations as A
from albumentations.pytorch import ToTensorV2

from torch.utils.data import Dataset, DataLoader
from torchmetrics import Accuracy, CohenKappa
from torchvision import models, transforms

from sklearn.model_selection import KFold, train_test_split, ShuffleSplit
from sklearn.metrics import cohen_kappa_score

In [2]:
def create_scarce_dataset(image_list, n_samples=150):
    samples =  np.random.choice(image_list, n_samples, replace=False)
    return samples

In [3]:
labels_key = {
    "KL01": 0,
    "KL234": 1
}

In [4]:
class ClassificationDataset(Dataset):
    def __init__(self, images, label_key, transform=None):
        self.images = images
        self.labels = label_key
        self.transforms = transform
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img = self.images[idx]
        label = self.labels[img.split("/")[-2]]
        img = np.array(Image.open(img).convert("RGB"))
        if self.transforms is not None:
            img = self.transforms(image=img)["image"]
            
        return img.float(), torch.tensor(label).float()

In [5]:
def get_model(model_arch="VGG"):
    import copy
    if model_arch == "VGG":
        model = copy.deepcopy(models.vgg11_bn(weights=models.VGG11_BN_Weights.DEFAULT))
        for param in model.parameters():
            param.requires_grad_ = False    
        model.classifier[6] = nn.Linear(model.classifier[6].in_features, 1)
    else:
        model = copy.deepcopy(models.resnet18(weights=models.ResNet18_Weights.DEFAULT))
        # for param in model.parameters():
        #     param.requires_grad_ = False
        model.fc = nn.Linear(model.fc.in_features, 1)
    model = model.to(device)

    return model

In [6]:
n_samples = 200
epochs = 10
lr = 1e-05
KL01_real = np.array(list(glob.iglob("/data_vault/hexai/KL01_KL234_Real/KL01/**")))
KL234_real = np.array(list(glob.iglob("/data_vault/hexai/KL01_KL234_Real/KL234/**")))

KL = create_scarce_dataset(np.concatenate([KL01_real, KL234_real]), n_samples=n_samples)
#KL234 = create_scarce_dataset(KL234_real, n_samples=n_samples)
#all_real = np.concatenate([KL01, KL234])

# KL01_fake = np.array(list(glob.iglob("/data_vault/hexai/SyntheticKneeImages/KL01/**")))
# KL234_fake = np.array(list(glob.iglob("/data_vault/hexai/SyntheticKneeImages/KL234/**")))
# all_fake =  np.concatenate([KL01_fake, KL234_fake])

In [None]:
KL

In [8]:

train, test  = train_test_split(KL, random_state=42, test_size=0.5, shuffle=True)
train_y_strat = [labels_key[img.split("/")[-2]] for img in train]
augmentations = A.Compose([A.Resize(224, 224), ToTensorV2()])


test_dataset = ClassificationDataset(test, labels_key, transform=augmentations)

test_dataloader = DataLoader(test_dataset, batch_size=32)

device = "cuda" if torch.cuda.is_available() else "cpu"


## ResNet18 on REAL Images ONLY

In [9]:
from tqdm import tqdm
criterion = nn.BCELoss()
cohen_kappa = CohenKappa(task="binary").to(device)
model="resnet18"

In [10]:
from sklearn.model_selection import ShuffleSplit
test_size=0.4
kfold_cv = ShuffleSplit(n_splits=5, test_size=test_size)

In [11]:
valid_size = int(len(train) * test_size)
train_size = len(train) - valid_size

In [None]:
for split in kfold_cv.split(train):
    print(len(split[0]), len(split[1]))

In [None]:
train_size, valid_size

In [14]:
step_size = 10

In [None]:
kfold_kappas = []
best_models = []
for i, (train_idx, valid_idx) in enumerate(kfold_cv.split(train)):
    print(f"Fold {i}")
    
    # Define datasets and dataloaders for training and validation
    train_dataset = ClassificationDataset(train[train_idx], labels_key, transform=augmentations)
    valid_dataset = ClassificationDataset(train[valid_idx], labels_key, transform=augmentations)

    train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    valid_dataloader = DataLoader(valid_dataset, batch_size=2, shuffle=False)
    
    valid_kappas = []
    best_model = None
    best_kappa = -1.
    
    # Initialize model and optimizer
    model = get_model(model).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    criterion = nn.BCELoss()

    # Training loop
    for epoch in range(epochs):
        print(f"Epoch {epoch}")
        model.train()
        running_loss = []
        running_kappa = []
        
        # Training
        for img, label in tqdm(train_dataloader):
            optimizer.zero_grad()
            img = img.to(device)
            label = label.to(device).float()
            out = torch.sigmoid(model(img)).squeeze(dim=-1)
            loss = criterion(out, label)
            loss.backward()
            optimizer.step()
            running_loss.append(loss.item())
        
        train_loss = np.mean(running_loss)
        print(f"Train Loss: {train_loss}")
        
        # Validation
        model.eval()
        running_kappa = []
        outputs = []
        labels = []
        with torch.no_grad():
            for img, label in valid_dataloader:
                img = img.to(device)
                label = label.to(device).float()
                out = torch.sigmoid(model(img)).squeeze(dim=-1)
                out = outputs.extend([1 if o.cpu().item() > 0.5 else 0 for o in out])
                labels.extend(label.cpu())
                
        
        val_kappa = cohen_kappa_score(labels, outputs)
        valid_kappas.append(val_kappa)
        print(f"Val. Kappa: {val_kappa}")
        
        # Save best model based on validation kappa
        if val_kappa > best_kappa:
            best_kappa = val_kappa
            best_model = copy.deepcopy(model)
    
    # Evaluate best model on validation set outside of training loop
    best_model.eval()
    best_models.append(best_model)
    print(f"Best Validation Kappa: {best_kappa}")
    kfold_kappas.append(best_kappa)

In [None]:
import scipy.stats as st 

# Calculate the mean and standard deviation of the scores
mean_score = np.mean(kfold_kappas)
std_score = np.std(kfold_kappas)
sem_score = std_score/np.sqrt(len(kfold_kappas))

# Calculate the 95% confidence interval using the t-distribution
confidence_level = 0.95
degrees_freedom = len(kfold_kappas) - 1
confidence_interval = st.t.interval(confidence_level, degrees_freedom, mean_score, sem_score)

print(f"Mean Score: {mean_score}")
print(f"95% Confidence Interval: {confidence_interval}")

In [None]:
kfold_kappas

In [None]:
best_fold=np.argmax(kfold_kappas)
best_model = best_models[best_fold]
outputs = []
labels = []
with torch.no_grad():
    for img, label in test_dataloader:
        img = img.to(device)
        out = torch.sigmoid(best_model(img)).squeeze(dim=-1)
        outputs.extend([1 if o.item() > 0.5 else 0 for o in out])
        labels.extend(label)

print(f"Final Test Kappa: {cohen_kappa_score(labels, outputs)}")

## ResNet18 on REAL Images + 50% FAKE

In [19]:
total_fake_images = int(train_size * .5 + train_size + train_size*1.5 + train_size * 2) - train_size
KL01_fake = np.array(list(glob.iglob("/data_vault/hexai/SyntheticKneeImages/KL01/**")))
KL234_fake = np.array(list(glob.iglob("/data_vault/hexai/SyntheticKneeImages/KL234/**")))
KL01_fake_aug = create_scarce_dataset(KL01_fake, n_samples=total_fake_images)
KL234_fake_aug = create_scarce_dataset(KL234_fake, n_samples=total_fake_images)

In [21]:
n_fake = int(train_size * 0.5)
all_fake = np.concatenate([KL01_fake_aug[:n_fake], KL234_fake_aug[:n_fake]])
np.random.shuffle(all_fake)

In [None]:
kfold_kappas = []
best_models = []
for i, (train_idx, valid_idx) in enumerate(kfold_cv.split(train, train_y_strat)):
    print(f"Fold {i}")
    
    # Define datasets and dataloaders for training and validation
    train_comb = np.concatenate([train[train_idx], all_fake])
    np.random.shuffle(train_comb)
    train_dataset = ClassificationDataset(train_comb, labels_key, transform=augmentations)
    valid_dataset = ClassificationDataset(train[valid_idx], labels_key, transform=augmentations)

    train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    valid_dataloader = DataLoader(valid_dataset, batch_size=2, shuffle=False)
    
    valid_kappas = []
    best_model = None
    best_kappa = -1.
    
    # Initialize model and optimizer
    model = get_model("VGG").to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    criterion = nn.BCELoss()

    # Training loop
    for epoch in range(epochs):
        print(f"Epoch {epoch}")
        model.train()
        running_loss = []
        running_kappa = []
        
        # Training
        for img, label in tqdm(train_dataloader):
            optimizer.zero_grad()
            img = img.to(device)
            label = label.to(device).float()
            out = torch.sigmoid(model(img)).squeeze(dim=-1)
            loss = criterion(out, label)
            loss.backward()
            optimizer.step()
            running_loss.append(loss.item())
        
        train_loss = np.mean(running_loss)
        print(f"Train Loss: {train_loss}")
        
        # Validation
        model.eval()
        running_kappa = []
        outputs = []
        labels = []
        with torch.no_grad():
            for img, label in valid_dataloader:
                img = img.to(device)
                label = label.to(device).float()
                out = torch.sigmoid(model(img)).squeeze(dim=-1)
                out = outputs.extend([1 if o.cpu().item() > 0.5 else 0 for o in out])
                labels.extend(label.cpu())
                
        
        val_kappa = cohen_kappa_score(labels, outputs)
        valid_kappas.append(val_kappa)
        print(f"Val. Kappa: {val_kappa}")
        
        # Save best model based on validation kappa
        if val_kappa > best_kappa:
            best_kappa = val_kappa
            best_model = copy.deepcopy(model)
    
    # Evaluate best model on validation set outside of training loop
    best_model.eval()
    best_models.append(best_model)
    print(f"Best Validation Kappa: {best_kappa}")
    kfold_kappas.append(best_kappa)

In [None]:
import scipy.stats as st 

# Calculate the mean and standard deviation of the scores
mean_score = np.mean(kfold_kappas)
std_score = np.std(kfold_kappas)
sem_score = std_score/np.sqrt(len(kfold_kappas))

# Calculate the 95% confidence interval using the t-distribution
confidence_level = 0.95
degrees_freedom = len(kfold_kappas) - 1
confidence_interval = st.t.interval(confidence_level, degrees_freedom, mean_score, sem_score)

print(f"Mean Score: {mean_score}")
print(f"95% Confidence Interval: {confidence_interval}")

In [None]:
best_fold=np.argmax(kfold_kappas)
best_model = best_models[best_fold]
outputs = []
labels = []
with torch.no_grad():
    for img, label in test_dataloader:
        img = img.to(device)
        out = torch.sigmoid(best_model(img)).squeeze(dim=-1)
        outputs.extend([1 if o.item() > 0.5 else 0 for o in out])
        labels.extend(label)

print(f"Final Test Kappa: {cohen_kappa_score(labels, outputs)}")

## ResNet18 on REAL Images + 100% FAKE 

In [26]:
n_fake = int(train_size * 1.0)
all_fake =  np.concatenate([KL01_fake_aug[:n_fake], KL234_fake_aug[:n_fake]])
np.random.shuffle(all_fake)

In [None]:
kfold_kappas = []
best_models = []
for i, (train_idx, valid_idx) in enumerate(kfold_cv.split(train)):
    print(f"Fold {i}")
    
    # Define datasets and dataloaders for training and validation
    train_comb = np.concatenate([train[train_idx], all_fake])
    np.random.shuffle(train_comb)
    train_dataset = ClassificationDataset(train_comb, labels_key, transform=augmentations)    
    valid_dataset = ClassificationDataset(train[valid_idx], labels_key, transform=augmentations)

    train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    valid_dataloader = DataLoader(valid_dataset, batch_size=2, shuffle=False)
    
    valid_kappas = []
    best_model = None
    best_kappa = -1.
    
    # Initialize model and optimizer
    model = get_model("VGG").to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    criterion = nn.BCELoss()

    # Training loop
    for epoch in range(epochs):
        print(f"Epoch {epoch}")
        model.train()
        running_loss = []
        running_kappa = []
        
        # Training
        for img, label in tqdm(train_dataloader):
            optimizer.zero_grad()
            img = img.to(device)
            label = label.to(device).float()
            out = torch.sigmoid(model(img)).squeeze(dim=-1)
            loss = criterion(out, label)
            loss.backward()
            optimizer.step()
            running_loss.append(loss.item())
        
        train_loss = np.mean(running_loss)
        print(f"Train Loss: {train_loss}")
        
        # Validation
        model.eval()
        running_kappa = []
        outputs = []
        labels = []
        with torch.no_grad():
            for img, label in valid_dataloader:
                img = img.to(device)
                label = label.to(device).float()
                out = torch.sigmoid(model(img)).squeeze(dim=-1)
                out = outputs.extend([1 if o.cpu().item() > 0.5 else 0 for o in out])
                labels.extend(label.cpu())
                
        
        val_kappa = cohen_kappa_score(labels, outputs)
        valid_kappas.append(val_kappa)
        print(f"Val. Kappa: {val_kappa}")
        
        # Save best model based on validation kappa
        if val_kappa > best_kappa:
            best_kappa = val_kappa
            best_model = copy.deepcopy(model)
    
    # Evaluate best model on validation set outside of training loop
    best_model.eval()
    best_models.append(best_model)
    print(f"Best Validation Kappa: {best_kappa}")
    kfold_kappas.append(best_kappa)

In [None]:
kfold_kappas

In [None]:
best_fold=np.argmax(kfold_kappas)
best_model = best_models[best_fold]
outputs = []
labels = []
with torch.no_grad():
    for img, label in test_dataloader:
        img = img.to(device)
        out = torch.sigmoid(best_model(img)).squeeze(dim=-1)
        outputs.extend([1 if o.item() > 0.5 else 0 for o in out])
        labels.extend(label)

print(f"Final Test Kappa: {cohen_kappa_score(labels, outputs)}")

## ResNet18 on REAL Images + 150% FAKE

In [31]:
n_fake = int(train_size * 1.5)
all_fake =  np.concatenate([KL01_fake_aug[:n_fake], KL234_fake_aug[:n_fake]])
np.random.shuffle(all_fake)

In [None]:
kfold_kappas = []
best_models = []
for i, (train_idx, valid_idx) in enumerate(kfold_cv.split(train, train_y_strat)):
    print(f"Fold {i}")
    
    # Define datasets and dataloaders for training and validation
    train_comb = np.concatenate([train[train_idx], all_fake])
    np.random.shuffle(train_comb)
    train_dataset = ClassificationDataset(train_comb, labels_key, transform=augmentations)    
    valid_dataset = ClassificationDataset(train[valid_idx], labels_key, transform=augmentations)

    train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    valid_dataloader = DataLoader(valid_dataset, batch_size=2, shuffle=False)
    
    valid_kappas = []
    best_model = None
    best_kappa = -1.
    
    # Initialize model and optimizer
    model = get_model("VGG").to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    criterion = nn.BCELoss()

    # Training loop
    for epoch in range(epochs):
        print(f"Epoch {epoch}")
        model.train()
        running_loss = []
        running_kappa = []
        
        # Training
        for img, label in tqdm(train_dataloader):
            optimizer.zero_grad()
            img = img.to(device)
            label = label.to(device).float()
            out = torch.sigmoid(model(img)).squeeze(dim=-1)
            loss = criterion(out, label)
            loss.backward()
            optimizer.step()
            running_loss.append(loss.item())
        
        train_loss = np.mean(running_loss)
        print(f"Train Loss: {train_loss}")
        
        # Validation
        model.eval()
        running_kappa = []
        outputs = []
        labels = []
        with torch.no_grad():
            for img, label in valid_dataloader:
                img = img.to(device)
                label = label.to(device).float()
                out = torch.sigmoid(model(img)).squeeze(dim=-1)
                out = outputs.extend([1 if o.cpu().item() > 0.5 else 0 for o in out])
                labels.extend(label.cpu())
                
        
        val_kappa = cohen_kappa_score(labels, outputs)
        valid_kappas.append(val_kappa)
        print(f"Val. Kappa: {val_kappa}")
        
        # Save best model based on validation kappa
        if val_kappa > best_kappa:
            best_kappa = val_kappa
            best_model = copy.deepcopy(model)
    
    # Evaluate best model on validation set outside of training loop
    best_model.eval()
    best_models.append(best_model)
    print(f"Best Validation Kappa: {best_kappa}")
    kfold_kappas.append(best_kappa)

In [None]:
kfold_kappas

In [None]:
import scipy.stats as st 

# Calculate the mean and standard deviation of the scores
mean_score = np.mean(kfold_kappas)
std_score = np.std(kfold_kappas)
sem_score = std_score/np.sqrt(len(kfold_kappas))

# Calculate the 95% confidence interval using the t-distribution
confidence_level = 0.95
degrees_freedom = len(kfold_kappas) - 1
confidence_interval = st.t.interval(confidence_level, degrees_freedom, mean_score, sem_score)

print(f"Mean Score: {mean_score}")
print(f"95% Confidence Interval: {confidence_interval}")

In [None]:
best_fold=np.argmax(kfold_kappas)
best_model = best_models[best_fold]
outputs = []
labels = []
with torch.no_grad():
    for img, label in test_dataloader:
        img = img.to(device)
        out = torch.sigmoid(best_model(img)).squeeze(dim=-1)
        outputs.extend([1 if o.item() > 0.5 else 0 for o in out])
        labels.extend(label)

print(f"Final Test Kappa: {cohen_kappa_score(labels, outputs)}")

## ResNet18 on REAL Images + 200% FAKE 

In [36]:
n_fake = int(train_size * 2.0)
all_fake =  np.concatenate([KL01_fake_aug[:n_fake], KL234_fake_aug[:n_fake]])
np.random.shuffle(all_fake)

In [None]:
kfold_kappas = []
best_models = []
for i, (train_idx, valid_idx) in enumerate(kfold_cv.split(train, train_y_strat)):
    print(f"Fold {i}")
    
    # Define datasets and dataloaders for training and validation
    train_comb = np.concatenate([train[train_idx], all_fake])
    np.random.shuffle(train_comb)
    train_dataset = ClassificationDataset(train_comb, labels_key, transform=augmentations)    
    valid_dataset = ClassificationDataset(train[valid_idx], labels_key, transform=augmentations)

    train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    valid_dataloader = DataLoader(valid_dataset, batch_size=2, shuffle=False)
    
    valid_kappas = []
    best_model = None
    best_kappa = -1.
    
    # Initialize model and optimizer
    model = get_model("VGG").to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    criterion = nn.BCELoss()

    # Training loop
    for epoch in range(epochs):
        print(f"Epoch {epoch}")
        model.train()
        running_loss = []
        running_kappa = []
        
        # Training
        for img, label in tqdm(train_dataloader):
            optimizer.zero_grad()
            img = img.to(device)
            label = label.to(device).float()
            out = torch.sigmoid(model(img)).squeeze(dim=-1)
            loss = criterion(out, label)
            loss.backward()
            optimizer.step()
            running_loss.append(loss.item())
        
        train_loss = np.mean(running_loss)
        print(f"Train Loss: {train_loss}")
        
        # Validation
        model.eval()
        running_kappa = []
        outputs = []
        labels = []
        with torch.no_grad():
            for img, label in valid_dataloader:
                img = img.to(device)
                label = label.to(device).float()
                out = torch.sigmoid(model(img)).squeeze(dim=-1)
                out = outputs.extend([1 if o.cpu().item() > 0.5 else 0 for o in out])
                labels.extend(label.cpu())
                
        
        val_kappa = cohen_kappa_score(labels, outputs)
        valid_kappas.append(val_kappa)
        print(f"Val. Kappa: {val_kappa}")
        
        # Save best model based on validation kappa
        if val_kappa > best_kappa:
            best_kappa = val_kappa
            best_model = copy.deepcopy(model)
    
    # Evaluate best model on validation set outside of training loop
    best_model.eval()
    best_models.append(best_model)
    print(f"Best Validation Kappa: {best_kappa}")
    kfold_kappas.append(best_kappa)

In [None]:
kfold_kappas

In [None]:
# import scipy.stats as st 

# Calculate the mean and standard deviation of the scores
mean_score = np.mean(kfold_kappas)
std_score = np.std(kfold_kappas)
sem_score = std_score/np.sqrt(len(kfold_kappas))

# Calculate the 95% confidence interval using the t-distribution
confidence_level = 0.95
degrees_freedom = len(kfold_kappas) - 1
confidence_interval = st.t.interval(confidence_level, degrees_freedom, mean_score, sem_score)

print(f"Mean Score: {mean_score}")
print(f"95% Confidence Interval: {confidence_interval}")

In [None]:
best_fold=np.argmax(kfold_kappas)
best_model = best_models[best_fold]
outputs = []
labels = []
with torch.no_grad():
    for img, label in test_dataloader:
        img = img.to(device)
        out = torch.sigmoid(best_model(img)).squeeze(dim=-1)
        outputs.extend([1 if o.item() > 0.5 else 0 for o in out])
        labels.extend(label)

print(f"Final Test Kappa: {cohen_kappa_score(labels, outputs)}")