In [None]:
pip install opendatasets

In [None]:
import json
import os

# Upload the kaggle.json file that you downloaded from Kaggle
from google.colab import files
files.upload()

# Create the .kaggle directory and move the kaggle.json file there
!mkdir ~/.kaggle
!mv kaggle.json ~/.kaggle/

# Set permissions for the kaggle.json file
!chmod 600 ~/.kaggle/kaggle.json

# Set environment variables (optional, opendatasets might handle this)
# You can verify if the authentication works after running the cell below
# os.environ['KAGGLE_USERNAME'] = 'YOUR_KAGGLE_USERNAME'
# os.environ['KAGGLE_KEY'] = 'YOUR_KAGGLE_KEY'

In [None]:
import os
import opendatasets as od
import os
import random
import argparse
from pathlib import Path
from sklearn.model_selection import train_test_split
import numpy as np
from PIL import Image
from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix, classification_report
import torch
from torch import nn
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm

# Download the dataset
dataset_url = 'https://www.kaggle.com/datasets/tawsifurrahman/tuberculosis-tb-chest-xray-dataset'
od.download(dataset_url)

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')
# Define the data directory
data_dir = './tuberculosis-tb-chest-xray-dataset/TB_Chest_Radiography_Database'
# Define model checkpoints directory
save_dir = "checkpoints"

In [None]:
# ---------------------------
# Reproducibility & device
# ---------------------------
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---------------------------
# Utility: metrics
# ---------------------------
def evaluate_model(model, loader):
    model.eval()
    y_true, y_probs, y_pred = [], [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            logits = model(xb).squeeze(-1).cpu()  # [batch]
            probs = torch.sigmoid(logits).numpy()
            preds = (probs >= 0.5).astype(int)
            y_probs.extend(probs.tolist())
            y_pred.extend(preds.tolist())
            y_true.extend(yb.numpy().tolist())
    acc = accuracy_score(y_true, y_pred)
    try:
        auc = roc_auc_score(y_true, y_probs)
    except Exception:
        auc = float("nan")
    cls_report = classification_report(y_true, y_pred, digits=4)
    return acc, auc, cls_report

In [None]:
set_seed(42)

# transforms
tf = transforms.Compose([
      transforms.RandomResizedCrop((224,224)),
      transforms.RandomHorizontalFlip(),
      transforms.ToTensor(),
      transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
  ])

full_ds = datasets.ImageFolder(os.path.join(data_dir), transform=tf)
# Store class_to_idx before splitting
class_to_idx = full_ds.class_to_idx

train_len = int(len(full_ds)*0.8)
val_len = len(full_ds) - train_len
train_ds, val_ds = random_split(full_ds, [train_len, val_len], generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=1)
val_loader = DataLoader(val_ds, batch_size=16, shuffle=False, num_workers=1)

In [None]:
# model:resnet50 -> single logit output
# using pretrained with weights='DEFAULT'
# training from scratch with weights=None
model = models.resnet50(weights=None)

for param in model.parameters():
      param.requires_grad = True  # fine-tune all (or set False to freeze)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 1)  # single logit
model = model.to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=2)

best_auc = 0.0
os.makedirs(save_dir, exist_ok=True)

for epoch in range(2):
    model.train()
    running_loss = 0.0
    for xb, yb in tqdm(train_loader):
        xb, yb = xb.to(device), yb.float().to(device)
        logits = model(xb).squeeze(-1)  # [batch]
        loss = criterion(logits, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * xb.size(0)

    train_loss = running_loss / len(train_loader.dataset)
    val_acc, val_auc, val_report = evaluate_model(model, val_loader)
    scheduler.step(val_loss := train_loss)  # or use val_auc etc
    print(f"[Epoch {epoch}] train_loss={train_loss:.4f} val_acc={val_acc:.4f} val_auc={val_auc:.4f} \n {val_report}")

    if val_auc > best_auc:
        best_auc = val_auc
        ckpt = os.path.join(save_dir, "tb_resnet50_best.pt")
        torch.save({"model_state": model.state_dict(), "class_to_idx": class_to_idx}, ckpt)
        print(f"  Saved best checkpoint to {ckpt}")

  # final eval
test_acc, test_auc, test_report = evaluate_model(model, val_loader)
print(f"Final val acc={test_acc:.4f}, auc={test_auc:.4f} \n {test_report}")