In [17]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, WeightedRandomSampler
import numpy as np

In [21]:
from torch.utils.data import WeightedRandomSampler

class PTDataset(Dataset):
    def __init__(self, root_dir, oversample=False):
        self.root_dir = root_dir
        self.files = []
        self.labels = []

        for label in ["nodules", "non_nodules"]:
            folder = os.path.join(root_dir, label)
            for file in os.listdir(folder):
                if file.endswith(".pt"):
                    self.files.append((os.path.join(folder, file), 1 if label == "nodules" else 0))
                    self.labels.append(1 if label == "nodules" else 0)

        # Oversampling to balance classes
        if oversample:
            class_counts = torch.bincount(torch.tensor(self.labels))
            class_weights = 1.0 / class_counts.float()
            sample_weights = [class_weights[label] for _, label in self.files]
            self.sampler = WeightedRandomSampler(sample_weights, len(self.files), replacement=True)
        else:
            self.sampler = None

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path, label = self.files[idx]
        tensor_image = torch.load(file_path)  # Load .pt file
        return tensor_image, torch.tensor(label, dtype=torch.long)


In [22]:
# Paths
data_dir = "dataset_split"
batch_size = 32

# Datasets
train_dataset = PTDataset(os.path.join(data_dir, "train"), oversample=True)
val_dataset = PTDataset(os.path.join(data_dir, "val"))
test_dataset = PTDataset(os.path.join(data_dir, "test"))

# Dataloaders
dataloaders = {
    "train": DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=False,
        sampler=train_dataset.sampler,
    ),
    "val": DataLoader(val_dataset, batch_size=batch_size, shuffle=False),
    "test": DataLoader(test_dataset, batch_size=batch_size, shuffle=False),
}

print("✅ Data loaders ready with oversampling applied!")

✅ Data loaders ready with oversampling applied!


In [26]:
# from timm import create_model

# # Load Swin Transformer Model
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = create_model("swin_tiny_patch4_window7_224", pretrained=True, num_classes=2)
# model.to(device)

# # Loss Function (Weighted)
# criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))

# # Optimizer
# optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [25]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in dataloaders["train"]:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        epoch_loss = running_loss / len(dataloaders["train"])
        epoch_acc = correct / total
        print(
            f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}"
        )

    print("✅ Training complete!")


In [27]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define loss & optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Train the model
train_model(model, dataloaders, criterion, optimizer, num_epochs=10)

Epoch 1/10, Loss: 0.6393, Accuracy: 0.6728
Epoch 2/10, Loss: 0.2323, Accuracy: 0.9228
Epoch 3/10, Loss: 0.0290, Accuracy: 0.9938
Epoch 4/10, Loss: 0.0059, Accuracy: 1.0000
Epoch 5/10, Loss: 0.0032, Accuracy: 1.0000
Epoch 6/10, Loss: 0.0011, Accuracy: 1.0000
Epoch 7/10, Loss: 0.0006, Accuracy: 1.0000
Epoch 8/10, Loss: 0.0009, Accuracy: 1.0000
Epoch 9/10, Loss: 0.0009, Accuracy: 1.0000
Epoch 10/10, Loss: 0.0007, Accuracy: 1.0000
✅ Training complete!


In [28]:
from sklearn.metrics import classification_report


def evaluate_model(model, dataloader, dataset_type="Validation"):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print(f"\n📊 {dataset_type} Classification Report:\n")
    print(
        classification_report(
            all_labels, all_preds, target_names=["Non-Nodule", "Nodule"]
        )
    )


# Evaluate on validation & test sets
evaluate_model(model, dataloaders["val"], "Validation")
evaluate_model(model, dataloaders["test"], "Test")


📊 Validation Classification Report:

              precision    recall  f1-score   support

  Non-Nodule       0.00      0.00      0.00         3
      Nodule       0.96      0.98      0.97        66

    accuracy                           0.94        69
   macro avg       0.48      0.49      0.49        69
weighted avg       0.91      0.94      0.93        69


📊 Test Classification Report:

              precision    recall  f1-score   support

  Non-Nodule       0.00      0.00      0.00         4
      Nodule       0.94      1.00      0.97        66

    accuracy                           0.94        70
   macro avg       0.47      0.50      0.49        70
weighted avg       0.89      0.94      0.92        70



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
