In [None]:
import os
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import SVC
from sklearn.metrics import precision_score, recall_score, f1_score
from torchinfo import summary
from tqdm.notebook import tqdm  # Use notebook version for Colab
import numpy as np
from typing import List, Dict, Tuple

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
class EfficientNetSVM(nn.Module):
    def __init__(self, base_model, device):
        super().__init__()
        self.features = nn.Sequential(*list(base_model.children())[:-1])
        self.svm = SVC(kernel='rbf', C=1, gamma=0.0001, probability=True) 
        self.device = device
        self.is_fitted = False

    def get_features(self, x):
        with torch.no_grad():
            features = self.features(x)
            features = features.squeeze(-1).squeeze(-1)
            return features.cpu().numpy()

    def fit_svm(self, dataloader):
        self.eval()
        all_features = []
        all_labels = []
        
        print("Extracting features for SVM training...")
        for batch, (X, y) in enumerate(tqdm(dataloader)):
            X = X.to(self.device)
            features = self.get_features(X)
            all_features.append(features)
            all_labels.extend(y.cpu().numpy())

        X_train = np.concatenate(all_features, axis=0)
        print("Fitting SVM...")
        self.svm.fit(X_train, all_labels)
        self.is_fitted = True
        print("SVM training completed!")

    # def forward(self, x):
    #     if not self.is_fitted:
    #         raise RuntimeError("SVM must be fitted before making predictions")
    #     features = self.get_features(x)
    #     probabilities = self.svm.predict_proba(features)
    #     return torch.from_numpy(probabilities).float().to(self.device)
    def forward(self, x):
        if not self.is_fitted:
            raise RuntimeError("Random Forest must be fitted before making predictions")
        features = self.get_features(x)
        # Get only the positive class probability for binary classification
        probabilities = self.svm.predict_proba(features)[:, 1]
        return torch.from_numpy(probabilities).float().to(self.device).unsqueeze(1)


In [None]:
# def test_step(model, dataloader, loss_fn, device):
#     model.eval()
#     test_loss, test_acc = 0, 0
#     all_labels, all_preds = [], []

#     with torch.inference_mode():
#         for batch, (X, y) in enumerate(dataloader):
#             X, y = X.to(device), y.to(device)
#             y_pred = model(X)
#             loss = loss_fn(y_pred, y)
#             test_loss += loss.item()
#             y_pred_class = y_pred.argmax(dim=1)
#             test_acc += (y_pred_class == y).sum().item() / len(y_pred_class)
#             all_labels.extend(y.cpu().tolist())
#             all_preds.extend(y_pred_class.cpu().tolist())

#     test_loss /= len(dataloader)
#     test_acc /= len(dataloader)
#     return test_loss, test_acc, all_labels, all_preds
def test_step(model, dataloader, loss_fn, device):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    all_labels, all_preds = [], []
    
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            # Get predictions (shape: [batch_size, 1])
            y_pred = model(X)
            
            # Calculate loss
            loss = loss_fn(y_pred, y.float().unsqueeze(1))
            test_loss += loss.item()
            
            # Calculate accuracy
            y_pred_class = (y_pred > 0.5).float()
            correct += (y_pred_class.squeeze(1) == y).sum().item()
            total += y.size(0)
            
            # Store predictions and labels for metrics
            all_labels.extend(y.cpu().tolist())
            all_preds.extend(y_pred_class.squeeze(1).cpu().tolist())
    
    test_loss /= len(dataloader)
    test_acc = correct / total
    return test_loss, test_acc, all_labels, all_preds


def train_svm_step(model, dataloader, device):
    model.fit_svm(dataloader)
    return 0.0, 0.0

In [None]:
# Full Dataset
# def create_dataloaders_with_cross_validation(
#     dataset_dir: str,
#     transform: transforms.Compose,
#     batch_size: int,
#     num_splits: int = 5,
#     num_workers: int = 2  # Reduced for Colab
# ):
#     print(f"Loading dataset from {dataset_dir}")
#     full_dataset = datasets.ImageFolder(dataset_dir, transform=transform)
#     print(f"Total images: {len(full_dataset)}")
#     print(f"Classes: {full_dataset.classes}")
    
#     skf = StratifiedKFold(n_splits=num_splits, shuffle=True, random_state=42)
#     train_dataloaders = []
#     test_dataloaders = []
#     class_names = full_dataset.classes
    
#     for fold, (train_indices, test_indices) in enumerate(skf.split(range(len(full_dataset)), full_dataset.targets)):
#         print(f"Fold {fold + 1}: Train size = {len(train_indices)}, Test size = {len(test_indices)}")
#         train_dataset = torch.utils.data.Subset(full_dataset, train_indices)
#         test_dataset = torch.utils.data.Subset(full_dataset, test_indices)
        
#         train_dataloader = DataLoader(
#             train_dataset,
#             batch_size=batch_size,
#             shuffle=True,
#             num_workers=num_workers,
#             pin_memory=True,
#         )
#         test_dataloader = DataLoader(
#             test_dataset,
#             batch_size=batch_size,
#             shuffle=False,
#             num_workers=num_workers,
#             pin_memory=True,
#         )
        
#         train_dataloaders.append(train_dataloader)
#         test_dataloaders.append(test_dataloader)
        
#     return train_dataloaders, test_dataloaders, class_names

In [None]:
#half Dataset
def create_dataloaders_with_cross_validation(
    dataset_dir: str,
    transform: transforms.Compose,
    batch_size: int,
    sampling_ratio: float = 0.5,  # New parameter for sampling ratio
    num_splits: int = 5,
    num_workers: int = 2
):
    print(f"Loading dataset from {dataset_dir}")
    full_dataset = datasets.ImageFolder(dataset_dir, transform=transform)
    print(f"Total images: {len(full_dataset)}")
    print(f"Classes: {full_dataset.classes}")
    
    # Get indices for each class
    class_indices = {i: [] for i in range(len(full_dataset.classes))}
    for idx, (_, label) in enumerate(full_dataset):
        class_indices[label].append(idx)
    
    # Randomly sample indices from each class
    sampled_indices = []
    for class_idx, indices in class_indices.items():
        n_samples = int(len(indices) * sampling_ratio)
        sampled_indices.extend(np.random.choice(indices, size=n_samples, replace=False))
    
    # Shuffle the sampled indices
    np.random.shuffle(sampled_indices)
    
    # Create a subset of the dataset with only sampled indices
    sampled_dataset = torch.utils.data.Subset(full_dataset, sampled_indices)
    sampled_targets = [full_dataset.targets[i] for i in sampled_indices]
    
    print(f"Original dataset size: {len(full_dataset)}")
    print(f"Sampled dataset size: {len(sampled_dataset)} ({sampling_ratio*100}%)")
    
    # Initialize StratifiedKFold
    skf = StratifiedKFold(n_splits=num_splits, shuffle=True, random_state=42)
    train_dataloaders = []
    test_dataloaders = []
    
    # Perform cross-validation on sampled dataset
    for fold, (train_indices, test_indices) in enumerate(skf.split(range(len(sampled_dataset)), sampled_targets)):
        print(f"Fold {fold + 1}: Train size = {len(train_indices)}, Test size = {len(test_indices)}")
        
        # Create train and test datasets
        train_dataset = torch.utils.data.Subset(sampled_dataset, train_indices)
        test_dataset = torch.utils.data.Subset(sampled_dataset, test_indices)
        
        # Create dataloaders
        train_dataloader = DataLoader(
            train_dataset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=num_workers,
            pin_memory=True,
        )
        test_dataloader = DataLoader(
            test_dataset,
            batch_size=batch_size,
            shuffle=False,
            num_workers=num_workers,
            pin_memory=True,
        )
        
        train_dataloaders.append(train_dataloader)
        test_dataloaders.append(test_dataloader)
        
    return train_dataloaders, test_dataloaders, full_dataset.classes


In [None]:
def train_with_cross_validation_svm(model, train_dataloaders, test_dataloaders, loss_fn, device):
    all_results = []

    for split in range(len(train_dataloaders)):
        print(f"\nTraining Split {split+1}/{len(train_dataloaders)}")
        results = {
            "train_loss": [], "train_acc": [],
            "test_loss": [], "test_acc": [],
            "precision": [], "recall": [], "f1": []
        }
        
        train_loss, train_acc = train_svm_step(model, train_dataloaders[split], device)
        test_loss, test_acc, all_labels, all_preds = test_step(
            model, test_dataloaders[split], loss_fn, device
        )

        precision = precision_score(all_labels, all_preds, average="weighted")
        recall = recall_score(all_labels, all_preds, average="weighted")
        f1 = f1_score(all_labels, all_preds, average="weighted")

        print(
            f"Split: {split+1} | "
            f"test_acc: {test_acc:.4f} | "
            f"precision: {precision:.4f} | "
            f"recall: {recall:.4f} | "
            f"f1: {f1:.4f}"
        )

        for metric, value in zip(
            ["train_loss", "train_acc", "test_loss", "test_acc", "precision", "recall", "f1"],
            [train_loss, train_acc, test_loss, test_acc, precision, recall, f1]
        ):
            results[metric].append(value)

        all_results.append(results)

    return all_results

In [None]:
# For Changing Trainable Layer
# pretrained_efficientnet_weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT
# pretrained_efficientnet = torchvision.models.efficientnet_b0(weights=pretrained_efficientnet_weights).to(device)
pretrained_efficientnet_weights = torchvision.models.EfficientNet_V2_S_Weights.DEFAULT

# 2. Setup EfficientNet model instance with pretrained weights
pretrained_efficientnet = torchvision.models.efficientnet_v2_s(weights=pretrained_efficientnet_weights).to(device)

# Freeze base parameters
for parameter in pretrained_efficientnet.parameters():
    parameter.requires_grad = False

# for layer in list(pretrained_efficientnet.children())[-3:]:
#     for param in layer.parameters():
#         param.requires_grad = True

# Get transforms
pretrained_efficientnet_transforms = pretrained_efficientnet_weights.transforms()

# Dataset path - adjust this to your Google Drive path
dataset_dir = '/kaggle/input/cmid-dataset/CMID'  # Adjust this path
class_names = ['defective', 'non defective']


In [None]:
# Half Dataset
train_dataloader_pretrained, test_dataloader_pretrained, class_names = create_dataloaders_with_cross_validation(
    dataset_dir=dataset_dir,
    transform=pretrained_efficientnet_transforms,
    batch_size=32,
    num_splits=5,
    sampling_ratio=0.5
)

In [None]:
# Full Dataset
# train_dataloader_pretrained, test_dataloader_pretrained, class_names = create_dataloaders_with_cross_validation(
#     dataset_dir=dataset_dir,
#     transform=pretrained_efficientnet_transforms,
#     batch_size=32,
#     num_splits=5
# )

In [None]:
model = EfficientNetSVM(pretrained_efficientnet, device).to(device)

# Loss function
# loss_fn = torch.nn.CrossEntropyLoss()
loss_fn = nn.BCELoss()

# Train and evaluate
results = train_with_cross_validation_svm(
    model=model,
    train_dataloaders=train_dataloader_pretrained,
    test_dataloaders=test_dataloader_pretrained,
    loss_fn=loss_fn,
    device=device
)

In [None]:
splits_f1 = [np.mean(split_result["f1"]) for split_result in results]
mean_f1 = np.mean(splits_f1)
std_f1 = np.std(splits_f1)
print(f"\nFinal Results:")
print(f"Mean F1 Score: {mean_f1:.4f} ± {std_f1:.4f}")