In [1]:
import os
import glob
import pandas as pd
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt
import copy

In [3]:
device = torch.device("cuda:5" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda:5


In [4]:
class ChestXrayDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            img = self.transform(img)
        label = self.labels[idx]
        return img, label

In [5]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])

In [14]:
train_dir = "data/train_data"
test_dir = "data/test_data"
no_dir  = "synthetic no finding"
inf_dir = "synthetic infiltration"
csv_path = "final data.csv"

In [10]:
df_labels = pd.read_csv(csv_path)

In [8]:
def get_filenames_and_labels(df, folder):
    filenames = df['Image Index'].tolist()
    labels = df['Label'].tolist()
    paths = [os.path.join(folder, fname) for fname in filenames]
    return paths, labels

In [11]:
train_paths, train_labels = get_filenames_and_labels(df_labels[df_labels['Image Index'].isin(os.listdir(train_dir))], train_dir)
test_paths, test_labels = get_filenames_and_labels(df_labels[df_labels['Image Index'].isin(os.listdir(test_dir))], test_dir)

In [13]:
train_ds = ChestXrayDataset(train_paths, train_labels, transform=train_transform)
test_ds  = ChestXrayDataset(test_paths,  test_labels,  transform=train_transform)

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True,  num_workers=4)
test_loader  = DataLoader(test_ds,  batch_size=32, shuffle=False, num_workers=4)

In [15]:
extra_no_paths  = glob.glob(os.path.join(no_dir,  "*.*"))
extra_inf_paths = glob.glob(os.path.join(inf_dir, "*.*"))

In [16]:
extra_no_labels  = [0] * len(extra_no_paths)
extra_inf_labels = [1] * len(extra_inf_paths)

# Training on Original data

In [17]:
kf = KFold(n_splits=5, shuffle=True, random_state=42)
val_scores = []
best_val_acc = 0
best_model = None

In [18]:
for fold, (train_idx, val_idx) in enumerate(kf.split(train_paths)):
    print(f"\nFold {fold+1}")
    fold_train_paths = [train_paths[i] for i in train_idx]
    fold_train_labels = [train_labels[i] for i in train_idx]
    fold_val_paths = [train_paths[i] for i in val_idx]
    fold_val_labels = [train_labels[i] for i in val_idx]

    train_ds = ChestXrayDataset(fold_train_paths, fold_train_labels, transform=train_transform)
    val_ds = ChestXrayDataset(fold_val_paths, fold_val_labels, transform=train_transform)
    train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=4)

    model = torchvision.models.resnet18(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, 2)
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(10):
        model.train()
        running_corrects = 0
        for imgs, labs in train_loader:
            imgs, labs = imgs.to(device), labs.to(device)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labs)
            loss.backward()
            optimizer.step()
            running_corrects += (outputs.argmax(1) == labs).sum().item()
        train_acc = running_corrects / len(train_ds)

        model.eval()
        val_corrects = 0
        with torch.no_grad():
            for imgs, labs in val_loader:
                imgs, labs = imgs.to(device), labs.to(device)
                outputs = model(imgs)
                val_corrects += (outputs.argmax(1) == labs).sum().item()
        val_acc = val_corrects / len(val_ds)
        print(f"Epoch {epoch+1} — train_acc: {train_acc:.4f}, val_acc: {val_acc:.4f}")

    val_scores.append(val_acc)
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model = copy.deepcopy(model.state_dict())


Fold 1




Epoch 1 — train_acc: 0.6888, val_acc: 0.7049
Epoch 2 — train_acc: 0.7136, val_acc: 0.7276
Epoch 3 — train_acc: 0.7289, val_acc: 0.7227
Epoch 4 — train_acc: 0.7439, val_acc: 0.7260
Epoch 5 — train_acc: 0.7575, val_acc: 0.7115
Epoch 6 — train_acc: 0.7816, val_acc: 0.7069
Epoch 7 — train_acc: 0.8101, val_acc: 0.7140
Epoch 8 — train_acc: 0.8399, val_acc: 0.7189
Epoch 9 — train_acc: 0.8687, val_acc: 0.7032
Epoch 10 — train_acc: 0.8985, val_acc: 0.7005

Fold 2
Epoch 1 — train_acc: 0.6842, val_acc: 0.7071
Epoch 2 — train_acc: 0.7151, val_acc: 0.7250
Epoch 3 — train_acc: 0.7250, val_acc: 0.7324
Epoch 4 — train_acc: 0.7430, val_acc: 0.7250
Epoch 5 — train_acc: 0.7640, val_acc: 0.7081
Epoch 6 — train_acc: 0.7853, val_acc: 0.7124
Epoch 7 — train_acc: 0.8089, val_acc: 0.6991
Epoch 8 — train_acc: 0.8403, val_acc: 0.7150
Epoch 9 — train_acc: 0.8738, val_acc: 0.7168
Epoch 10 — train_acc: 0.9000, val_acc: 0.6955

Fold 3
Epoch 1 — train_acc: 0.6907, val_acc: 0.7197
Epoch 2 — train_acc: 0.7192, val_acc:

Testing on unseen data with real images

# Training on Original + Synthetic Data

In [20]:
combined_paths = train_paths + extra_no_paths + extra_inf_paths
combined_labels = train_labels + extra_no_labels + extra_inf_labels

kf = KFold(n_splits=5, shuffle=True, random_state=42)
val_scores_syn = []
best_val_acc_syn = 0
best_model_syn = None

In [None]:
for fold, (train_idx, val_idx) in enumerate(kf.split(combined_paths)):
    print(f"\nFold {fold+1}")
    fold_train_paths = [combined_paths[i] for i in train_idx]
    fold_train_labels = [combined_labels[i] for i in train_idx]
    fold_val_paths = [combined_paths[i] for i in val_idx]
    fold_val_labels = [combined_labels[i] for i in val_idx]

    train_ds = ChestXrayDataset(fold_train_paths, fold_train_labels, transform=train_transform)
    val_ds = ChestXrayDataset(fold_val_paths, fold_val_labels, transform=train_transform)
    train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=4)

    model_syn = torchvision.models.resnet18(pretrained=True)
    model_syn.fc = nn.Linear(model_syn.fc.in_features, 2)
    model_syn = model_syn.to(device)
    optimizer = optim.Adam(model_syn.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(10):
        model_syn.train()
        running_corrects = 0
        for imgs, labs in train_loader:
            imgs, labs = imgs.to(device), labs.to(device)
            optimizer.zero_grad()
            outputs = model_syn(imgs)
            loss = criterion(outputs, labs)
            loss.backward()
            optimizer.step()
            running_corrects += (outputs.argmax(1) == labs).sum().item()
        train_acc = running_corrects / len(train_ds)

        model_syn.eval()
        val_corrects = 0
        with torch.no_grad():
            for imgs, labs in val_loader:
                imgs, labs = imgs.to(device), labs.to(device)
                outputs = model_syn(imgs)
                val_corrects += (outputs.argmax(1) == labs).sum().item()
        val_acc = val_corrects / len(val_ds)
        print(f"Epoch {epoch+1} — train_acc: {train_acc:.4f}, val_acc: {val_acc:.4f}")

    val_scores_syn.append(val_acc)
    if val_acc > best_val_acc_syn:
        best_val_acc_syn = val_acc
        best_model_syn = copy.deepcopy(model_syn.state_dict())


Fold 1
Epoch 1 — train_acc: 0.8610, val_acc: 0.8892
Epoch 2 — train_acc: 0.8902, val_acc: 0.8442
Epoch 3 — train_acc: 0.9137, val_acc: 0.8619
Epoch 4 — train_acc: 0.9336, val_acc: 0.8733
Epoch 5 — train_acc: 0.9450, val_acc: 0.8575
Epoch 6 — train_acc: 0.9522, val_acc: 0.8430
Epoch 7 — train_acc: 0.9577, val_acc: 0.8462
Epoch 8 — train_acc: 0.9639, val_acc: 0.8327
Epoch 9 — train_acc: 0.9670, val_acc: 0.8332
Epoch 10 — train_acc: 0.9674, val_acc: 0.8361

Fold 2
Epoch 1 — train_acc: 0.9353, val_acc: 0.9651
Epoch 2 — train_acc: 0.9540, val_acc: 0.9665
Epoch 3 — train_acc: 0.9642, val_acc: 0.9468
Epoch 4 — train_acc: 0.9682, val_acc: 0.9279
Epoch 5 — train_acc: 0.9725, val_acc: 0.9488
Epoch 6 — train_acc: 0.9726, val_acc: 0.9459
Epoch 7 — train_acc: 0.9754, val_acc: 0.9419
Epoch 8 — train_acc: 0.9786, val_acc: 0.9287
Epoch 9 — train_acc: 0.9768, val_acc: 0.9360
Epoch 10 — train_acc: 0.9764, val_acc: 0.9160

Fold 3
Epoch 1 — train_acc: 0.9661, val_acc: 0.9883
Epoch 2 — train_acc: 0.9741, 

# Testing on completely unseen data

The cross validation above also included the synthetic data. Hence, the next step is testing both the models' performance using unseen test data that was set aside and not even used during the GAN's training process.

In [19]:
model.load_state_dict(best_model)
model.eval()
test_corrects = 0
with torch.no_grad():
    for imgs, labs in test_loader:
        imgs, labs = imgs.to(device), labs.to(device)
        outputs = model(imgs)
        test_corrects += (outputs.argmax(1) == labs).sum().item()
test_acc = test_corrects / len(test_loader.dataset)
print(f"Test Accuracy (Original): {test_acc:.4f}")

Test Accuracy (Original): 0.7046


In [29]:
model_syn.load_state_dict(best_model_syn)
model_syn.eval()
test_corrects = 0
with torch.no_grad():
    for imgs, labs in test_loader:
        imgs, labs = imgs.to(device), labs.to(device)
        outputs = model_syn(imgs)
        test_corrects += (outputs.argmax(1) == labs).sum().item()
test_acc = test_corrects / len(test_loader.dataset)
print(f"Test Accuracy (Original+synthetic): {test_acc:.4f}")

Test Accuracy (Original+synthetic): 0.7224
