In [1]:
import os
from typing import Tuple, List

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import transforms, datasets

import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report

ModuleNotFoundError: No module named 'torch'

In [None]:
TRAIN_DIR = r"C:\Users\sudeepta\Desktop\DR_project\augmented_resized_V2\train"
VAL_DIR   = r"C:\Users\sudeepta\Desktop\DR_project\augmented_resized_V2\val"
IMAGE_SIZE = 224
BATCH_SIZE = 32
PROJ_DIM = 256
NUM_CLASSES = 5
HEAD_EPOCHS = 20
PARTIAL_EPOCHS = 0
FINE_EPOCHS = 30
LR = 1e-3
DEVICE_STR = 'cuda'  # use 'cpu' to force CPU
SAVE_PATH = 'hybrid_checkpoint.pth'
OUT_DIR = 'outputs'

In [2]:
class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel=3, stride=1, padding=1, groups=1):
        super().__init__()
        self.conv = nn.Conv2d(in_ch, out_ch, kernel, stride, padding, groups=groups, bias=False)
        self.bn = nn.BatchNorm2d(out_ch)
        self.act = nn.ReLU(inplace=True)
    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

class VisionNet(nn.Module):
    def __init__(self, in_channels=3, num_features=PROJ_DIM):
        super().__init__()
        self.stem = nn.Sequential(
            ConvBlock(in_channels, 32, kernel=3, stride=2),
            ConvBlock(32, 32),
            ConvBlock(32, 64, stride=2),
        )
        self.stage1 = nn.Sequential(
            ConvBlock(64, 128),
            ConvBlock(128, 128),
        )
        self.stage2 = nn.Sequential(
            ConvBlock(128, 256, stride=2),
            ConvBlock(256, 256),
        )
        self.stage3 = nn.Sequential(
            ConvBlock(256, 512, stride=2),
            ConvBlock(512, 512),
        )
        self.pool = nn.AdaptiveAvgPool2d(1)
        self._out_dim = num_features
        self.project = nn.Linear(512, self._out_dim)

    def forward(self, x):
        x = self.stem(x)
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.pool(x).flatten(1)
        x = self.project(x)
        return x

    def feature_dim(self):
        return self._out_dim

class VisionMamba(nn.Module):
    def __init__(self, in_channels=3, num_features=PROJ_DIM):
        super().__init__()
        self.stem = nn.Sequential(
            ConvBlock(in_channels, 24, stride=2),
            ConvBlock(24, 24),
        )
        self.depthwise = nn.Sequential(
            ConvBlock(24, 48, groups=24, stride=2),
            ConvBlock(48, 48, groups=48),
            ConvBlock(48, 96, stride=2),
            ConvBlock(96, 96),
        )
        self.spp = nn.Sequential(
            ConvBlock(96, 192, kernel=1, padding=0),
            ConvBlock(192, 192),
        )
        self.pool = nn.AdaptiveAvgPool2d(1)
        self._out_dim = num_features
        self.project = nn.Linear(192, self._out_dim)

    def forward(self, x):
        x = self.stem(x)
        x = self.depthwise(x)
        x = self.spp(x)
        x = self.pool(x).flatten(1)
        x = self.project(x)
        return x

    def feature_dim(self):
        return self._out_dim

class CondenseBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel=3, stride=1, groups=4):
        super().__init__()
        self.pw = nn.Conv2d(in_ch, out_ch, kernel_size=1, bias=False, groups=groups)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.act1 = nn.ReLU(inplace=True)
        self.dw = nn.Conv2d(out_ch, out_ch, kernel_size=kernel, padding=kernel//2, stride=stride, groups=out_ch, bias=False)
        self.bn2 = nn.BatchNorm2d(out_ch)
        self.act2 = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.act1(self.bn1(self.pw(x)))
        x = self.act2(self.bn2(self.dw(x)))
        return x

class CondenseNetSimple(nn.Module):
    def __init__(self, in_channels=3, num_features=PROJ_DIM):
        super().__init__()
        self.stem = nn.Sequential(
            ConvBlock(in_channels, 32, stride=2),
            CondenseBlock(32, 64, groups=4),
            CondenseBlock(64, 128, groups=4),
            CondenseBlock(128, 256, groups=8),
        )
        self.pool = nn.AdaptiveAvgPool2d(1)
        self._out_dim = num_features
        self.project = nn.Linear(256, self._out_dim)

    def forward(self, x):
        x = self.stem(x)
        x = self.pool(x).flatten(1)
        x = self.project(x)
        return x

    def feature_dim(self):
        return self._out_dim

class GatedProjection(nn.Module):
    def __init__(self, in_dim, proj_dim=PROJ_DIM):
        super().__init__()
        self.fc = nn.Linear(in_dim, proj_dim)
        self.bn = nn.BatchNorm1d(proj_dim)
        self.act = nn.ReLU(inplace=True)
        self.gate = nn.Sequential(
            nn.Linear(proj_dim, max(8, proj_dim//8)),
            nn.ReLU(inplace=True),
            nn.Linear(max(8, proj_dim//8), 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.act(self.bn(self.fc(x)))
        g = self.gate(x)
        return x * g

class HybridFusionModel(nn.Module):
    def __init__(self, backboneA, backboneB, backboneC, proj_dim=PROJ_DIM, num_classes=NUM_CLASSES):
        super().__init__()
        self.backA = backboneA
        self.backB = backboneB
        self.backC = backboneC
        self.projA = GatedProjection(self.backA.feature_dim(), proj_dim)
        self.projB = GatedProjection(self.backB.feature_dim(), proj_dim)
        self.projC = GatedProjection(self.backC.feature_dim(), proj_dim)
        fused_dim = proj_dim * 3
        self.head = nn.Sequential(
            nn.Linear(fused_dim, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        fA = self.backA(x)
        fB = self.backB(x)
        fC = self.backC(x)
        pA = self.projA(fA)
        pB = self.projB(fB)
        pC = self.projC(fC)
        fused = torch.cat([pA, pB, pC], dim=1)
        out = self.head(fused)
        return out

NameError: name 'nn' is not defined

In [3]:
def get_dataloaders_from_dirs(train_dir: str, val_dir: str, image_size: int = IMAGE_SIZE, batch_size: int = BATCH_SIZE, num_workers: int = 4) -> Tuple[DataLoader, DataLoader]:
    train_tf = transforms.Compose([
        transforms.RandomResizedCrop(image_size),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.2, 0.2, 0.2, 0.02),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    val_tf = transforms.Compose([
        transforms.Resize(int(image_size*1.14)),
        transforms.CenterCrop(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])
    train_ds = datasets.ImageFolder(train_dir, transform=train_tf)
    val_ds = datasets.ImageFolder(val_dir, transform=val_tf)
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)
    return train_loader, val_loader

# -------------------- Training utilities ------------------------------
def save_checkpoint(state, fname=SAVE_PATH):
    torch.save(state, fname)

def evaluate(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    loss_meter = 0.0
    loss_fn = nn.CrossEntropyLoss()
    with torch.no_grad():
        for imgs, labels in loader:
            imgs = imgs.to(device)
            labels = labels.to(device)
            logits = model(imgs)
            loss = loss_fn(logits, labels)
            loss_meter += loss.item() * imgs.size(0)
            preds = logits.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += imgs.size(0)
    return loss_meter / total, correct / total

def train_one_epoch(model, loader, optimizer, device, epoch):
    model.train()
    loss_fn = nn.CrossEntropyLoss()
    running_loss = 0.0
    total = 0
    correct = 0
    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * imgs.size(0)
        total += imgs.size(0)
        preds = outputs.argmax(dim=1)
        correct += (preds == labels).sum().item()
    avg_loss = running_loss / total
    acc = correct / total
    print(f"Epoch {epoch} Train Loss: {avg_loss:.4f} Acc: {acc:.4f}")
    return avg_loss, acc

# -------------------- Plotting & confusion ----------------------------
def plot_metrics(train_losses: List[float], val_losses: List[float], train_accs: List[float], val_accs: List[float], out_dir: str):
    os.makedirs(out_dir, exist_ok=True)
    epochs = range(1, len(train_losses)+1)
    plt.figure()
    plt.plot(epochs, train_losses, linestyle='-', marker='o')
    plt.plot(epochs, val_losses, linestyle='-', marker='o')
    plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.title('Training & Validation Loss')
    plt.legend(['train','val']); plt.grid(True); plt.tight_layout()
    plt.savefig(os.path.join(out_dir, 'loss_curve.png')); plt.close()

    plt.figure()
    plt.plot(epochs, train_accs, linestyle='-', marker='o')
    plt.plot(epochs, val_accs, linestyle='-', marker='o')
    plt.xlabel('Epoch'); plt.ylabel('Accuracy'); plt.title('Training & Validation Accuracy')
    plt.legend(['train','val']); plt.grid(True); plt.tight_layout()
    plt.savefig(os.path.join(out_dir, 'acc_curve.png')); plt.close()
    print('Saved plots to', out_dir)

def compute_and_save_confusion(model, loader, device, class_names: List[str], out_dir: str):
    model.eval()
    preds_all = []
    labels_all = []
    with torch.no_grad():
        for imgs, labels in loader:
            imgs = imgs.to(device)
            logits = model(imgs)
            preds = logits.argmax(dim=1).cpu().numpy()
            preds_all.extend(preds.tolist())
            labels_all.extend(labels.numpy().tolist())
    cm = confusion_matrix(labels_all, preds_all)
    print('Confusion matrix:\n', cm)
    print('Classification report:\n', classification_report(labels_all, preds_all, target_names=class_names))

    os.makedirs(out_dir, exist_ok=True)
    plt.figure(figsize=(8,6))
    plt.imshow(cm, interpolation='nearest')
    plt.title('Confusion matrix'); plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45); plt.yticks(tick_marks, class_names)
    plt.ylabel('True label'); plt.xlabel('Predicted label'); plt.tight_layout()
    cm_path = os.path.join(out_dir, 'confusion_matrix.png')
    plt.savefig(cm_path); plt.close()
    print('Saved confusion matrix to', cm_path)

NameError: name 'IMAGE_SIZE' is not defined

In [4]:
def main():
    device = torch.device(DEVICE_STR if torch.cuda.is_available() else 'cpu')
    print('Using device:', device)

    visionnet = VisionNet(num_features=PROJ_DIM)
    visionmamba = VisionMamba(num_features=PROJ_DIM)
    condensenet = CondenseNetSimple(num_features=PROJ_DIM)

    model = HybridFusionModel(visionnet, visionmamba, condensenet, proj_dim=PROJ_DIM, num_classes=NUM_CLASSES)
    model.to(device)

    print("Train dir:", TRAIN_DIR)
    print("Val dir:  ", VAL_DIR)

    train_loader, val_loader = get_dataloaders_from_dirs(TRAIN_DIR, VAL_DIR, IMAGE_SIZE, BATCH_SIZE)
    class_names = sorted([d for d in os.listdir(TRAIN_DIR) if os.path.isdir(os.path.join(TRAIN_DIR, d))])

    train_losses = []; val_losses = []; train_accs = []; val_accs = []

    # Stage 1: freeze backbones, train head-only
    for param in model.backA.parameters(): param.requires_grad = False
    for param in model.backB.parameters(): param.requires_grad = False
    for param in model.backC.parameters(): param.requires_grad = False
    for param in model.projA.parameters(): param.requires_grad = True
    for param in model.projB.parameters(): param.requires_grad = True
    for param in model.projC.parameters(): param.requires_grad = True
    for param in model.head.parameters(): param.requires_grad = True

    optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LR, weight_decay=1e-4)
    best_val_acc = 0.0

    print(f"Head-only training for {HEAD_EPOCHS} epochs...")
    for epoch in range(1, HEAD_EPOCHS + 1):
        tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, device, epoch)
        val_loss, val_acc = evaluate(model, val_loader, device)
        train_losses.append(tr_loss); train_accs.append(tr_acc)
        val_losses.append(val_loss); val_accs.append(val_acc)
        print(f"Validation after head epoch {epoch}: loss {val_loss:.4f}, acc {val_acc:.4f}")
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            save_checkpoint({'epoch': epoch, 'model_state': model.state_dict(), 'acc': val_acc}, SAVE_PATH)

    # Stage 2: partial unfreeze if requested
    if PARTIAL_EPOCHS > 0:
        print(f"Partial unfreeze for {PARTIAL_EPOCHS} epochs...")
        for param in model.backA.stage3.parameters(): param.requires_grad = True
        for param in model.backB.spp.parameters(): param.requires_grad = True
        for param in model.backC.stem[-1].parameters(): param.requires_grad = True

        optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LR * 0.1, weight_decay=1e-4)
        for epoch in range(HEAD_EPOCHS + 1, HEAD_EPOCHS + PARTIAL_EPOCHS + 1):
            tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, device, epoch)
            val_loss, val_acc = evaluate(model, val_loader, device)
            train_losses.append(tr_loss); train_accs.append(tr_acc)
            val_losses.append(val_loss); val_accs.append(val_acc)
            print(f"Validation after partial epoch {epoch}: loss {val_loss:.4f}, acc {val_acc:.4f}")
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                save_checkpoint({'epoch': epoch, 'model_state': model.state_dict(), 'acc': val_acc}, SAVE_PATH)

    # Stage 3: full fine-tune
    print(f"Full fine-tune for {FINE_EPOCHS} epochs...")
    for param in model.parameters(): param.requires_grad = True
    optimizer = torch.optim.AdamW(model.parameters(), lr=LR * 0.01, weight_decay=1e-5)

    start_epoch = HEAD_EPOCHS + PARTIAL_EPOCHS + 1
    for i in range(FINE_EPOCHS):
        epoch = start_epoch + i
        tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, device, epoch)
        val_loss, val_acc = evaluate(model, val_loader, device)
        train_losses.append(tr_loss); train_accs.append(tr_acc)
        val_losses.append(val_loss); val_accs.append(val_acc)
        print(f"Validation after fine epoch {epoch}: loss {val_loss:.4f}, acc {val_acc:.4f}")
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            save_checkpoint({'epoch': epoch, 'model_state': model.state_dict(), 'acc': val_acc}, SAVE_PATH)

    print("Training complete. Best val acc:", best_val_acc)

    # Save plots and confusion matrix
    plot_metrics(train_losses, val_losses, train_accs, val_accs, OUT_DIR)
    compute_and_save_confusion(model, val_loader, device, class_names, OUT_DIR)

if __name__ == "__main__":
    main()

NameError: name 'torch' is not defined