In [1]:
# Install pennylane
!pip install pennylane --quiet

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split, Dataset
from collections import Counter
import pennylane as qml
from pennylane import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
from PIL import Image
import warnings
import uuid
import os
import logging

warnings.filterwarnings("ignore")

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Set random seeds for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
qml.numpy.random.seed(seed)

# Device setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Custom transform for contrast enhancement
class AdjustContrast:
    def __init__(self, factor=2.0):
        self.factor = factor
    def __call__(self, img):
        return transforms.functional.adjust_contrast(img, self.factor)

# Define transformations with reduced augmentation
data_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    AdjustContrast(2.0),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Define transformations for testing
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    AdjustContrast(2.0),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Custom dataset to handle corrupted files
class RobustImageFolder(Dataset):
    def __init__(self, root, transform=None):
        self.dataset = datasets.ImageFolder(root, transform=None)
        self.transform = transform
        self.valid_indices = []
        self.skipped_files = []
        
        # Verify files
        for idx in range(len(self.dataset)):
            try:
                path, _ = self.dataset.samples[idx]
                with open(path, 'rb') as f:
                    Image.open(f).verify()
                self.valid_indices.append(idx)
            except Exception as e:
                self.skipped_files.append((path, str(e)))
                logger.warning(f"Skipped file {path}: {e}")
        
        logger.info(f"Total files: {len(self.dataset)}, Valid files: {len(self.valid_indices)}, Skipped: {len(self.skipped_files)}")
        if self.skipped_files:
            for path, error in self.skipped_files[:5]:
                logger.info(f"Example skipped file: {path} (Error: {error})")

    def __len__(self):
        return len(self.valid_indices)

    def __getitem__(self, idx):
        actual_idx = self.valid_indices[idx]
        path, target = self.dataset.samples[actual_idx]
        try:
            img = Image.open(path).convert('RGB')
            if self.transform:
                img = self.transform(img)
            return img, target
        except Exception as e:
            logger.error(f"Error loading {path}: {e}")
            raise

# Dataset path
data_dir = "/kaggle/input/mmmmmmm/dataset_blood_group"
print("Checking dataset structure:")
!ls {data_dir}

# Load dataset
try:
    dataset = RobustImageFolder(data_dir, transform=data_transform)
except Exception as e:
    print(f"Failed to load dataset at {data_dir}: {e}")
    raise

class_names = dataset.dataset.classes
print(f"Classes: {class_names}")

# Analyze class distribution
class_counts = Counter(dataset.dataset.targets[i] for i in dataset.valid_indices)
print("Class distribution:", {class_names[i]: count for i, count in class_counts.items()})

# Split into training and validation (80/20)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=0, pin_memory=True)
dataloaders = {'train': train_loader, 'val': val_loader}

# Compute class weights
class_counts_array = np.array([class_counts[i] for i in range(len(class_names))])
class_weights = torch.tensor(1.0 / class_counts_array, dtype=torch.float).to(device)

# Define classical CNN
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.dropout = nn.Dropout(0.4)
        self.fc1 = nn.Linear(128 * 28 * 28, 512)
        self.fc2 = nn.Linear(512, 8)
        self.shortcut = nn.Conv2d(64, 128, kernel_size=1, stride=1, padding=0)

    def forward(self, x):
        x1 = self.pool(F.relu(self.bn1(self.conv1(x))))  # 224→112
        x2 = self.pool(F.relu(self.bn2(self.conv2(x1))))  # 112→56
        shortcut = self.shortcut(x2)  # 56→56
        x3 = F.relu(self.bn3(self.conv3(x2)) + shortcut)  # 56→56
        x4 = self.pool(F.relu(self.bn4(self.conv4(x3))))  # 56→28
        x5 = x4.view(-1, 128 * 28 * 28)
        x6 = F.relu(self.fc1(x5))
        x7 = self.dropout(x6)
        x8 = self.fc2(x7)
        return x8

# Define quantum circuit
n_qubits = 4
dev = qml.device('default.qubit', wires=n_qubits)

@qml.qnode(dev, interface='torch')
def quantum_circuit(inputs, weights):
    qml.AmplitudeEmbedding(inputs, wires=range(n_qubits), normalize=True)
    qml.templates.BasicEntanglerLayers(weights, wires=range(n_qubits))
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

# Define quantum layer
class QuantumLayer(nn.Module):
    def __init__(self):
        super().__init__()
        weight_shapes = {"weights": (2, n_qubits)}
        self.q_layer = qml.qnn.TorchLayer(quantum_circuit, weight_shapes)

    def forward(self, x):
        return self.q_layer(x)

# Define hybrid CNN
class HybridCNN(nn.Module):
    def __init__(self):
        super(HybridCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(2, 2)
        self.flatten = nn.Flatten()
        self.shortcut = nn.Conv2d(64, 128, kernel_size=1, stride=1, padding=0)
        self.fc1 = nn.Linear(128 * 28 * 28, 1024)
        self.fc2 = nn.Linear(1024, 64)
        self.fc_quantum = nn.Linear(64, 16)  # For AmplitudeEmbedding
        self.bn5 = nn.BatchNorm1d(16)
        self.dropout = nn.Dropout(0.3)
        self.q_layer = QuantumLayer()
        self.fc3 = nn.Linear(4, 8)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        shortcut = self.shortcut(x)
        x = F.relu(self.bn3(self.conv3(x)) + shortcut)
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc_quantum(x))
        x = self.bn5(x)
        x = self.q_layer(x)
        x = self.fc3(x)
        return x

# Function to train and evaluate
def train_model(model, criterion, optimizer, scheduler, num_epochs, model_name, patience=20):
    train_losses, val_losses = [], []
    train_accs, val_accs = [], []
    best_val_loss = float('inf')
    epochs_no_improve = 0
    
    for epoch in range(num_epochs):
        print(f'\nEpoch {epoch+1}/{num_epochs}')
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
                print("Training...")
            else:
                model.eval()
                print("Validating...")
            running_loss = 0.0
            running_corrects = 0
            try:
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)
                    optimizer.zero_grad()
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)
                        if phase == 'train':
                            loss.backward()
                            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
                            optimizer.step()
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                epoch_loss = running_loss / len(dataloaders[phase].dataset)
                epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
                print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
                if phase == 'train':
                    train_losses.append(epoch_loss)
                    train_accs.append(epoch_acc.item())
                else:
                    val_losses.append(epoch_loss)
                    val_accs.append(epoch_acc.item())
                    if epoch_loss < best_val_loss:
                        best_val_loss = epoch_loss
                        epochs_no_improve = 0
                        torch.save(model.state_dict(), f'/kaggle/working/best_{model_name}.pkl')
                    else:
                        epochs_no_improve += 1
                    if isinstance(scheduler, optim.lr_scheduler.ReduceLROnPlateau):
                        scheduler.step(epoch_loss)
                    else:
                        scheduler.step()
                    if epochs_no_improve >= patience:
                        print(f"Early stopping at epoch {epoch+1}")
                        break
            except Exception as e:
                logger.error(f"Error in {phase} phase: {e}")
                raise
        if epochs_no_improve >= patience:
            break
    
    # Save final model
    torch.save(model.state_dict(), f'/kaggle/working/{model_name}.pkl')
    print(f'Final model saved as {model_name}.pkl')
    
    # Plot training metrics
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title(f'{model_name} Loss')
    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label='Train Acc')
    plt.plot(val_accs, label='Val Acc')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title(f'{model_name} Accuracy')
    plt.tight_layout()
    plt.savefig(f'/kaggle/working/{model_name}_metrics.png')
    plt.close()
    
    return train_losses, val_losses, train_accs, val_accs

# Evaluate model with misprediction logging
def evaluate_model(model, dataloader, model_name):
    model.eval()
    all_preds = []
    all_labels = []
    mispredictions = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            probs = F.softmax(outputs, dim=1)
            _, preds = torch.max(outputs, 1)
            for i in range(len(labels)):
                if preds[i] != labels[i]:
                    mispredictions.append({
                        'True': class_names[labels[i]],
                        'Predicted': class_names[preds[i]],
                        'Probs': {class_names[j]: probs[i][j].item() for j in range(8)}
                    })
                all_preds.append(preds[i].cpu().numpy())
                all_labels.append(labels[i].cpu().numpy())
    
    # Print up to 5 mispredictions
    print(f"\n{model_name} Mispredictions (up to 5):")
    for i, mis in enumerate(mispredictions[:5]):
        print(f"Misprediction {i+1}: True={mis['True']}, Predicted={mis['Predicted']}")
        print(f"Probabilities: {', '.join([f'{k}: {v:.4f}' for k, v in mis['Probs'].items()])}")
    
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'{model_name} Confusion Matrix')
    plt.savefig(f'/kaggle/working/{model_name}_cm.png')
    plt.close()
    
    report = classification_report(all_labels, all_preds, target_names=class_names, output_dict=True)
    print(f"\n{model_name} Classification Report:")
    print(classification_report(all_labels, all_preds, target_names=class_names))
    return cm, report

# Function to test a single image
def test_single_image(image_path, model, model_name, transform, class_names):
    try:
        image = Image.open(image_path).convert('RGB')
        image = transform(image)
        image = image.unsqueeze(0).to(device)
        model.eval()
        with torch.no_grad():
            outputs = model(image)
            probs = F.softmax(outputs, dim=1)
            _, pred = torch.max(outputs, 1)
            predicted_class = class_names[pred.item()]
            prob_list = {class_names[i]: probs[0][i].item() for i in range(len(class_names))}
        print(f"\n{model_name} Prediction for {image_path}:")
        print(f"Predicted Class: {predicted_class}")
        print(f"Probabilities: {', '.join([f'{k}: {v:.4f}' for k, v in prob_list.items()])}")
        return predicted_class, prob_list
    except Exception as e:
        print(f"Error processing image {image_path}: {e}")
        return None, None

# Debug quantum layer outputs
def debug_quantum_output(model, image_path, transform):
    try:
        model.eval()
        image = Image.open(image_path).convert('RGB')
        image = transform(image).unsqueeze(0).to(device)
        with torch.no_grad():
            x = model.pool(F.relu(model.bn1(model.conv1(x))))
            x = model.pool(F.relu(model.bn2(model.conv2(x))))
            shortcut = model.shortcut(x)
            x = F.relu(model.bn3(model.conv3(x)) + shortcut)
            x = model.pool(F.relu(model.bn4(model.conv4(x))))
            x = model.flatten(x)
            x = F.relu(model.fc1(x))
            x = model.dropout(x)
            x = F.relu(model.fc2(x))
            x = F.relu(model.fc_quantum(x))
            x = model.bn5(x)
            q_out = model.q_layer(x)
            print(f"Quantum layer output for {image_path}: {q_out.cpu().numpy()}")
        return q_out
    except Exception as e:
        print(f"Error debugging quantum output for {image_path}: {e}")
        return None

# Visualize test image
def visualize_image(image_path):
    try:
        image = Image.open(image_path).convert('RGB')
        plt.imshow(image)
        plt.title("Test Image")
        plt.axis('off')
        plt.savefig('/kaggle/working/test_image.png')
        plt.close()
        print("Test image visualized and saved as test_image.png")
    except Exception as e:
        print(f"Error visualizing image {image_path}: {e}")

# Train classical CNN
model_classical = SimpleCNN().to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer_classical = optim.SGD(model_classical.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
scheduler_classical = optim.lr_scheduler.ReduceLROnPlateau(optimizer_classical, mode='min', factor=0.5, patience=5)
train_losses_c, val_losses_c, train_accs_c, val_accs_c = train_model(
    model_classical, criterion, optimizer_classical, scheduler_classical, 
    num_epochs=50, model_name='simple_cnn_blood_group'
)
print("\nEvaluating SimpleCNN on validation set:")
cm_classical, report_classical = evaluate_model(model_classical, val_loader, "SimpleCNN")

# Train hybrid CNN
model_hybrid = HybridCNN().to(device)
optimizer_hybrid = optim.AdamW([
    {'params': model_hybrid.q_layer.parameters(), 'lr': 0.01},
    {'params': [p for n, p in model_hybrid.named_parameters() if 'q_layer' not in n], 'lr': 0.001}
], weight_decay=1e-4)
scheduler_hybrid = optim.lr_scheduler.ReduceLROnPlateau(optimizer_hybrid, mode='min', factor=0.5, patience=5)
train_losses_h, val_losses_h, train_accs_h, val_accs_h = train_model(
    model_hybrid, criterion, optimizer_hybrid, scheduler_hybrid, 
    num_epochs=50, model_name='hybrid_cnn_blood_group'
)
print("\nEvaluating HybridCNN on validation set:")
cm_hybrid, report_hybrid = evaluate_model(model_hybrid, val_loader, "HybridCNN")

# Compare models
results = {
    'Model': ['SimpleCNN', 'HybridCNN'],
    'Val Accuracy': [max(val_accs_c), max(val_accs_h)],
    'Val Loss': [min(val_losses_c), min(val_losses_h)],
    'Macro F1': [report_classical['macro avg']['f1-score'], report_hybrid['macro avg']['f1-score']]
}
print("\nModel Comparison:")
print(pd.DataFrame(results))

# Save comparison
pd.DataFrame(results).to_csv('/kaggle/working/model_comparison.csv', index=False)


ModuleNotFoundError: No module named 'pandas'

In [9]:
# Test single image
image_path = "/kaggle/input/mmmmmmm/dataset_blood_group/B-/cluster_3_1018.BMP"
print(f"\nTesting image: {image_path}")

# Visualize test image
visualize_image(image_path)

# Debug quantum layer for HybridCNN
debug_quantum_output(model_hybrid, image_path, test_transform)

# Load and test with SimpleCNN
try:
    model_classical.load_state_dict(torch.load('/kaggle/working/simple_cnn_blood_group.pkl'))
except FileNotFoundError:
    print("SimpleCNN model file not found. Using the last trained model.")
test_single_image(image_path, model_classical, "SimpleCNN", test_transform, class_names)

# Load and test with HybridCNN
try:
    model_hybrid.load_state_dict(torch.load('/kaggle/working/hybrid_cnn_blood_group.pkl'))
except FileNotFoundError:
    print("HybridCNN model file not found. Using the last trained model.")
test_single_image(image_path, model_hybrid, "HybridCNN", test_transform, class_names)


Testing image: /kaggle/input/mmmmmmm/dataset_blood_group/B-/cluster_3_1018.BMP
Test image visualized and saved as test_image.png
Error debugging quantum output for /kaggle/input/mmmmmmm/dataset_blood_group/B-/cluster_3_1018.BMP: cannot access local variable 'x' where it is not associated with a value

SimpleCNN Prediction for /kaggle/input/mmmmmmm/dataset_blood_group/B-/cluster_3_1018.BMP:
Predicted Class: B-
Probabilities: A+: 0.0000, A-: 0.0001, AB+: 0.0000, AB-: 0.0000, B+: 0.0000, B-: 0.9999, O+: 0.0000, O-: 0.0000

HybridCNN Prediction for /kaggle/input/mmmmmmm/dataset_blood_group/B-/cluster_3_1018.BMP:
Predicted Class: B-
Probabilities: A+: 0.0001, A-: 0.1617, AB+: 0.0007, AB-: 0.0070, B+: 0.0104, B-: 0.8169, O+: 0.0032, O-: 0.0000


('B-',
 {'A+': 8.974920638138428e-05,
  'A-': 0.16171498596668243,
  'AB+': 0.0007081134244799614,
  'AB-': 0.006993262097239494,
  'B+': 0.01041414961218834,
  'B-': 0.8169117569923401,
  'O+': 0.003157068509608507,
  'O-': 1.0802724318637047e-05})