In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pickle
import numpy as np
import torch
from torch.utils.data import Dataset, random_split, DataLoader, Subset
from scipy.ndimage import rotate
import torch.nn.functional as F
import pandas as pd

In [None]:
# -----------------------------
# 1Ô∏è‚É£ Set paths for already uploaded files
# -----------------------------
train_pkl = '/content/drive/MyDrive/Code/train_classification.pkl'
test_pkl  = '/content/drive/MyDrive/Code/test_classification.pkl'

import os
print("Train exists:", os.path.exists(train_pkl))
print("Test exists:", os.path.exists(test_pkl))


In [None]:
class ACDCClassificationDataset(Dataset):
    def __init__(self, pkl_path, use_both_phases=True, target_shape=(128,128,8), mean=None, std=None):
        with open(pkl_path, 'rb') as f:
            self.data = pickle.load(f)
        self.ids = list(self.data.keys())
        self.use_both = use_both_phases
        self.target_shape = target_shape
        self.mean = mean
        self.std = std

        label_map = {'NOR':0,'MINF':1,'DCM':2,'HCM':3,'RV':4}
        self.labels = [label_map.get(self.data[i]['Label'], -1) for i in self.ids]

    def __len__(self):
        return len(self.ids)

    def _resize_volume(self, vol):
        # vol: [H, W, D]
        vol_t = torch.tensor(vol[np.newaxis, np.newaxis, ...], dtype=torch.float32)  # [1,1,H,W,D]
        resized = torch.nn.functional.interpolate(vol_t, size=self.target_shape, mode='trilinear', align_corners=False)
        return resized.squeeze(0).squeeze(0).numpy()  # remove only first two singleton dims

    def __getitem__(self, idx):
        pid = self.ids[idx]
        sample = self.data[pid]

        # Resize ED and ES volumes
        ed = self._resize_volume(sample['ED'].astype(np.float32))  # [128,128,8]
        es = self._resize_volume(sample['ES'].astype(np.float32))  # [128,128,8]

        # Normalization
        if self.mean is not None and self.std is not None:
            ed = (ed - self.mean) / (self.std + 1e-8)
            es = (es - self.mean) / (self.std + 1e-8)

        # Stack phases
        vol = np.stack([ed, es], axis=0) if self.use_both else ed[np.newaxis, ...]  # [2,H,W,D] or [1,H,W,D]

        # Reorder axes to C √ó D √ó H √ó W for PyTorch 3D CNN
        vol = np.transpose(vol, (0, 3, 1, 2))  # [C, D, H, W]

        label = self.labels[idx]
        return torch.tensor(vol, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


In [None]:
# -----------------------------
# Compute mean and std (training set only)
# -----------------------------
full_dataset_tmp = ACDCClassificationDataset(train_pkl, use_both_phases=True, target_shape=(128,128,8))
all_vols = []
for pid in full_dataset_tmp.ids:
    sample = full_dataset_tmp.data[pid]
    all_vols.append(sample['ED'].astype(np.float32).flatten())
    all_vols.append(sample['ES'].astype(np.float32).flatten())
all_vols = np.concatenate(all_vols)
mean = all_vols.mean()
std = all_vols.std()
print(f"Training mean: {mean:.4f}, std: {std:.4f}")

In [None]:
# -----------------------------
# Split dataset (train/val/test)
# -----------------------------
target_shape = (128,128,8)

# Full dataset with normalization for training
full_dataset = ACDCClassificationDataset(
    train_pkl,
    use_both_phases=True,
    target_shape=target_shape,
    mean=mean,
    std=std
)

# Split train/validation
train_size = int(0.8 * len(full_dataset))   # 80% train
val_size = len(full_dataset) - train_size  # 20% validation
train_dataset, val_dataset = random_split(
    full_dataset,
    [train_size, val_size],
    generator=torch.Generator().manual_seed(42)
)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=2)

# Test dataset (no augmentation)
test_dataset = ACDCClassificationDataset(
    test_pkl,
    use_both_phases=True,
    target_shape=target_shape,
    mean=mean,
    std=std
)
test_loader  = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=2)

print(f"Train samples: {len(train_dataset)}, Val samples: {len(val_dataset)}, Test samples: {len(test_dataset)}")


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
import time
import matplotlib.pyplot as plt

# ==========================
# 3D CNN Model
# ==========================
class model_3DCNN(nn.Module):
    def __init__(self, num_classes=5, in_channels=2):
        super(model_3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(in_channels, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm3d(32)
        self.pool1 = nn.MaxPool3d(2)

        self.conv2 = nn.Conv3d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm3d(64)
        self.pool2 = nn.MaxPool3d(2)

        self.conv3 = nn.Conv3d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm3d(128)
        self.pool3 = nn.MaxPool3d(2)

        self.conv4 = nn.Conv3d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm3d(256)
        self.pool4 = nn.AdaptiveAvgPool3d(1)

        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)

        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)

        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool3(x)

        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool4(x)

        x = x.flatten(start_dim=1)
        x = self.dropout(x)
        x = self.fc(x)
        return x

# ==========================
# Device, Model, Loss, Optimizer
# ==========================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model_3DCNN(num_classes=5, in_channels=2).to(device)

criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.AdamW(model.parameters(), lr=3e-, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=30)

# ==========================
# Metrics storage
# ==========================
history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}
num_epochs = 100

# ==========================
# Training Loop
# =========================
metrics_csv = "/content/training_history.csv"

# ==========================
# Training Loop with CSV logging
# ==========================
for epoch in range(1, num_epochs + 1):
    start_time = time.time()

    # --- Training ---
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch}/{num_epochs}", leave=False):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * labels.size(0)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)

    train_loss = running_loss / total
    train_acc = correct / total

    # --- Validation ---
    model.eval()
    val_loss, val_correct, val_total = 0.0, 0, 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * labels.size(0)
            _, predicted = outputs.max(1)
            val_correct += predicted.eq(labels).sum().item()
            val_total += labels.size(0)

    val_loss /= val_total
    val_acc = val_correct / val_total

    # Scheduler step
    scheduler.step(val_loss)

    # Store metrics in dictionary
    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    history["train_acc"].append(train_acc)
    history["val_acc"].append(val_acc)

    # Save metrics to CSV
    df = pd.DataFrame({
        "epoch": list(range(1, epoch + 1)),
        "train_loss": history["train_loss"],
        "val_loss": history["val_loss"],
        "train_acc": history["train_acc"],
        "val_acc": history["val_acc"]
    })
    df.to_csv(metrics_csv, index=False)

    elapsed = time.time() - start_time
    print(f"Epoch {epoch}/{num_epochs} | "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f} | "
          f"Time: {elapsed:.1f}s")

print(f"\nTraining history saved to {metrics_csv}")

In [None]:
# ==========================
# üîπ Plot training history with accuracy limit
# ==========================
import matplotlib.pyplot as plt

# Loss plot
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(history['train_loss'], label='Train Loss', color='blue')
plt.plot(history['val_loss'], label='Validation Loss', color='red')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.grid(True)

# Accuracy plot
plt.subplot(1,2,2)
plt.plot(history['train_acc'], label='Train Accuracy', color='blue')
plt.plot(history['val_acc'], label='Validation Accuracy', color='red')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training vs Validation Accuracy')
plt.ylim(0, 1)  # ‚úÖ Set accuracy limits from 0 to 1
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()


In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
import torch

# Make sure the model is in evaluation mode
model.eval()

all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:  # your test dataloader
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# ==========================
# Test Accuracy
# ==========================
correct = sum([p == t for p, t in zip(all_preds, all_labels)])
total = len(all_labels)
test_acc = correct / total
print(f"Test Accuracy: {test_acc:.4f}")

# ==========================
# Confusion Matrix
# ==========================
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix - Test Set")
plt.show()

# ==========================
# Classification Report
# ==========================
report = classification_report(all_labels, all_preds, target_names=['Class 0','Class 1','Class 2','Class 3','Class 4'])
print("Classification Report:\n", report)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Class labels
labels = ["Normal", "DCM", "HCM", "MINF", "ARV"]

# Metrics from classification report
precision = [0.759, 0.707, 0.709, 0.733, 0.814]
recall = [0.820, 0.750, 0.730, 0.740, 0.790]
f1 = [0.788, 0.728, 0.719, 0.736, 0.802]

# Set bar positions
x = np.arange(len(labels))
width = 0.25

# Create plot
plt.figure(figsize=(8, 5))
bars1 = plt.bar(x - width, precision, width, label='Precision')
bars2 = plt.bar(x, recall, width, label='Recall')
bars3 = plt.bar(x + width, f1, width, label='F1-score')


# Labels and formatting
plt.xlabel("Group", fontsize=11)
plt.ylabel("Score", fontsize=11)
plt.title("Classification Metrics per Class-using Image", fontsize=13)
plt.xticks(x, labels)
plt.ylim(0, 1)
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F

# ==========================
# ROC Curve Evaluation
# ==========================
model.eval()
all_labels = []
all_probs = []

with torch.no_grad():
    for inputs, labels in val_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        probs = F.softmax(outputs, dim=1).cpu().numpy()
        all_probs.append(probs)
        all_labels.append(labels.cpu().numpy())

# Concatenate predictions and labels
all_labels = np.concatenate(all_labels)
all_probs = np.concatenate(all_probs, axis=0)

# Binarize labels for ROC computation
num_classes = 5
class_names = ["Normal", "DCM", "HCM", "MINF", "ARV"]
y_true_bin = label_binarize(all_labels, classes=np.arange(num_classes))

# Compute ROC curve and AUC for each class
fpr, tpr, roc_auc = {}, {}, {}
for i in range(num_classes):
    fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], all_probs[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# ==========================
# Plot ROC Curves
# ==========================
plt.figure(figsize=(8, 6))
for i, label in enumerate(class_names):
    plt.plot(fpr[i], tpr[i], lw=2, label=f"{label} (AUC = {roc_auc[i]:.2f})")

plt.plot([0, 1], [0, 1], "k--", lw=1)
plt.title("ROC Curve - 3D CNN (Validation Set)")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend(loc="lower right")
plt.grid(True, linestyle="--", alpha=0.6)
plt.tight_layout()
plt.show()