In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Subset
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision import datasets
import matplotlib.pyplot as plt
import cv2
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc
import seaborn as sns
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2
import time
import copy
import random
import pandas as pd
from torch.optim.lr_scheduler import ReduceLROnPlateau
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define dataset path
dataset_path = r"C:\Users\vishn\OneDrive\Desktop\SUBASH_BTP\archive\dataset\train"
test_path = r"C:\Users\vishn\OneDrive\Desktop\SUBASH_BTP\archive\dataset\test"  # Assuming test path exists

# Define classes and labels
classes = {"fractured": 1, "not fractured": 0}
image_size = (224, 224)  # Standard size for many pretrained models

# Data augmentation for training
train_transforms = A.Compose([
    A.Resize(image_size[0], image_size[1]),
    A.OneOf([
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2),
        A.RandomGamma(),
        A.HueSaturationValue()
    ], p=0.5),
    A.OneOf([
        A.ElasticTransform(alpha=120, sigma=120 * 0.05, alpha_affine=120 * 0.03),
        A.GridDistortion(),
        A.OpticalDistortion(distort_limit=2, shift_limit=0.5)
    ], p=0.3),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, border_mode=cv2.BORDER_CONSTANT, p=0.5),
    A.GaussNoise(p=0.2),
    A.Normalize(mean=[0.5], std=[0.5]),
    ToTensorV2()
])

# Transforms for validation/testing (no augmentation)
val_transforms = A.Compose([
    A.Resize(image_size[0], image_size[1]),
    A.Normalize(mean=[0.5], std=[0.5]),
    ToTensorV2()
])

# Custom dataset class with Albumentations
class BoneFractureDataset(Dataset):
    def __init__(self, folder_path, transform=None):
        self.transform = transform
        self.images = []
        self.labels = []
        
        # Load fractured images
        fractured_dir = os.path.join(folder_path, "fractured")
        for filename in os.listdir(fractured_dir):
            if filename.endswith(('.jpg', '.jpeg', '.png')):
                self.images.append(os.path.join(fractured_dir, filename))
                self.labels.append(1)  # 1 for fractured
        
        # Load non-fractured images
        non_fractured_dir = os.path.join(folder_path, "not fractured")
        for filename in os.listdir(non_fractured_dir):
            if filename.endswith(('.jpg', '.jpeg', '.png')):
                self.images.append(os.path.join(non_fractured_dir, filename))
                self.labels.append(0)  # 0 for non-fractured

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        
        # Read image
        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        
        # Apply transformations
        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        
        # Convert label to tensor
        label = torch.tensor(label, dtype=torch.float32)
        
        return image, label

train_dataset = BoneFractureDataset(dataset_path, transform=train_transforms)

# # Create a subset with the first 10 samples
# train_dataset = Subset(full_train_dataset, indices=range(10))

# Split into train and validation sets (80/20)
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

# Update the transform for validation subset
class TransformDataset(Dataset):
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform
        
    def __len__(self):
        return len(self.dataset)
        
    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        
        # If image is already a tensor, convert back to numpy for albumentations
        if isinstance(image, torch.Tensor):
            image = image.numpy()
            if image.shape[0] == 1:  # If it's already in CHW format
                image = image.squeeze(0)
        
        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
            
        return image, label

val_subset = TransformDataset(val_subset, val_transforms)

# Create DataLoaders
train_loader = DataLoader(train_subset, batch_size=16, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_subset, batch_size=16, shuffle=False, num_workers=4, pin_memory=True)

# Print dataset sizes
print(f"Train size: {len(train_subset)}")
print(f"Validation size: {len(val_subset)}")

# Define a more advanced CNN model (ResNet-based)
class EnhancedBoneFractureCNN(nn.Module):
    def __init__(self, pretrained=True):
        super(EnhancedBoneFractureCNN, self).__init__()
        
        # Load pretrained ResNet18
        self.resnet = models.resnet18(pretrained=pretrained)
        
        # Modify the first convolution layer to accept grayscale images
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        
        # Replace the final fully connected layer
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Identity()  # Remove original FC layer
        
        # Create new classifier
        self.classifier = nn.Sequential(
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )
        
        # Initialize the new layers
        for m in self.classifier.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
                    
    def forward(self, x):
        features = self.resnet(x)
        output = self.classifier(features)
        return output

# Create model instance
model = EnhancedBoneFractureCNN(pretrained=True)
model = model.to(device)

# Print model summary
print(model)

# Loss function
criterion = nn.BCELoss()

# Optimizer with weight decay
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)

# Learning rate scheduler
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=7, verbose=True)

# Create a simple CNN model for ensemble (optional)
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(128, 1),  
            nn.Sigmoid()  
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc_layers(x)
        return x

# Training and validation function
def train_model(model, criterion, optimizer, scheduler, dataloaders, num_epochs=200):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_val_auc = 0.0
    history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': [], 'val_auc': []}
    
    # for epoch in range(num_epochs):
    #     epoch_start = time.time()
    #     print(f'Epoch {epoch+1}/{num_epochs}')
    #     print('-' * 10)
    for epoch in tqdm(range(num_epochs), desc="Training Epochs"):
        epoch_start = time.time()
        print(f'\nEpoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                dataloader = dataloaders['train']
            else:
                model.eval()   # Set model to evaluate mode
                dataloader = dataloaders['val']
            
            running_loss = 0.0
            all_labels = []
            all_preds = []
            all_probs = []
            
            # Iterate over data
            for inputs, labels in tqdm(dataloader, desc=phase):
                inputs = inputs.to(device)
                labels = labels.to(device).unsqueeze(1)  # Add channel dimension
                
                # Zero the parameter gradients
                optimizer.zero_grad()
                
                # Forward pass - track history only in train phase
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    
                    # Backward + optimize only in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # Statistics
                running_loss += loss.item() * inputs.size(0)
                
                # Save predictions and actual labels
                probs = outputs.detach().cpu().numpy()
                preds = (probs >= 0.5).astype(int)
                all_probs.extend(probs)
                all_preds.extend(preds)
                all_labels.extend(labels.cpu().numpy())
            
            epoch_loss = running_loss / len(dataloader.dataset)
            
            # Convert to numpy arrays for metrics calculation
            all_labels = np.array(all_labels).flatten()
            all_preds = np.array(all_preds).flatten()
            all_probs = np.array(all_probs).flatten()
            
            # Calculate metrics
            epoch_acc = accuracy_score(all_labels, all_preds)
            
            # Print metrics for the phase
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # Save history
            if phase == 'train':
                history['train_loss'].append(epoch_loss)
                history['train_acc'].append(epoch_acc)
            else:  # validation phase
                history['val_loss'].append(epoch_loss)
                history['val_acc'].append(epoch_acc)
                
                # Calculate validation AUC
                if len(np.unique(all_labels)) > 1:  # Only calculate AUC if both classes are present
                    fpr, tpr, _ = roc_curve(all_labels, all_probs)
                    val_auc = auc(fpr, tpr)
                    history['val_auc'].append(val_auc)
                    print(f'Validation AUC: {val_auc:.4f}')
                    
                    # Update scheduler
                    scheduler.step(epoch_loss)
                    
                    # Save best model
                    if val_auc > best_val_auc:
                        best_val_auc = val_auc
                        best_model_wts = copy.deepcopy(model.state_dict())
                        # Save best model
                        torch.save({
                            'epoch': epoch,
                            'model_state_dict': model.state_dict(),
                            'optimizer_state_dict': optimizer.state_dict(),
                            'loss': epoch_loss,
                            'auc': val_auc,
                        }, 'best_bone_fracture_model.pth')
                        print(f'New best model saved! AUC: {val_auc:.4f}')
        
        epoch_time = time.time() - epoch_start
        print(f'Epoch complete in {epoch_time:.0f}s')
        print()
        
        # Save checkpoint every 10 epochs
        if (epoch + 1) % 10 == 0:
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'loss': epoch_loss,
            }, f'bone_fracture_model_epoch_{epoch+1}.pth')
    
    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model, history

# Run the training
dataloaders = {'train': train_loader, 'val': val_loader}
trained_model, history = train_model(model, criterion, optimizer, scheduler, dataloaders, num_epochs=1)

# Plot training history
plt.figure(figsize=(16, 6))

plt.subplot(1, 3, 1)
plt.plot(history['train_loss'], label='Train')
plt.plot(history['val_loss'], label='Validation')
plt.title('Loss')
plt.xlabel('Epoch')
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(history['train_acc'], label='Train')
plt.plot(history['val_acc'], label='Validation')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(history['val_auc'], label='Validation AUC')
plt.title('Validation AUC')
plt.xlabel('Epoch')
plt.legend()

plt.tight_layout()
plt.savefig('training_history.png')
plt.show()



SyntaxError: (unicode error) 'unicodeescape' codec can't decode bytes in position 2-3: truncated \UXXXXXXXX escape (2643812492.py, line 42)

In [2]:
import os

print("Train dir exists:", os.path.exists('./dataset/train'))
print("Val dir exists:", os.path.exists('./dataset/val'))
print("Train subfolders:", os.listdir('./dataset/train') if os.path.exists('./dataset/train') else [])


Train dir exists: False
Val dir exists: False
Train subfolders: []


In [None]:
# Evaluate the model on validation set for detailed metrics
def evaluate_model(model, dataloader):
    model.eval()
    all_labels = []
    all_preds = []
    all_probs = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(dataloader, desc='Evaluating'):
            inputs = inputs.to(device)
            labels = labels.to(device).unsqueeze(1)
            
            outputs = model(inputs)
            probs = outputs.cpu().numpy()
            preds = (probs >= 0.5).astype(int)
            
            all_probs.extend(probs)
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
    
    # Convert to numpy arrays
    all_labels = np.array(all_labels).flatten()
    all_preds = np.array(all_preds).flatten()
    all_probs = np.array(all_probs).flatten()
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    
    # ROC curve and AUC
    fpr, tpr, _ = roc_curve(all_labels, all_probs)
    roc_auc = auc(fpr, tpr)
    
    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    
    # Print metrics
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    print(f'AUC: {roc_auc:.4f}')
    print(f'Confusion Matrix:\n{cm}')
    
    # Plot ROC curve
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.savefig('roc_curve.png')
    plt.show()
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Not Fractured', 'Fractured'],
                yticklabels=['Not Fractured', 'Fractured'])
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.title('Confusion Matrix')
    plt.savefig('confusion_matrix.png')
    plt.show()
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'auc': roc_auc,
        'confusion_matrix': cm
    }

# Evaluate the model
evaluation_results = evaluate_model(trained_model, val_loader)

# Save the trained model
torch.save({
    'model_state_dict': trained_model.state_dict(),
    'evaluation_results': evaluation_results
}, 'final_bone_fracture_model.pth')

# Function to predict on single image (for future use)
def predict_image(model, image_path):
    # Read and preprocess the image
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    transform = val_transforms
    augmented = transform(image=image)
    image_tensor = augmented['image'].unsqueeze(0).to(device)
    
    # Prediction
    model.eval()
    with torch.no_grad():
        output = model(image_tensor)
        probability = output.item()
        prediction = 1 if probability >= 0.5 else 0
    
    return {
        'prediction': 'Fractured' if prediction == 1 else 'Not Fractured',
        'probability': probability
    }

print("Training and evaluation complete!")

In [None]:
def evaluate_model(model, dataloader):
    model.eval()
    all_labels = []
    all_preds = []
    all_probs = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(dataloader, desc='Evaluating'):
            inputs = inputs.to(device)
            labels = labels.to(device).unsqueeze(1)
            
            outputs = model(inputs)
            probs = outputs.cpu().numpy()
            preds = (probs >= 0.5).astype(int)
            
            all_probs.extend(probs)
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
    
    # Convert to numpy arrays
    all_labels = np.array(all_labels).flatten()
    all_preds = np.array(all_preds).flatten()
    all_probs = np.array(all_probs).flatten()
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    
    # ROC curve and AUC
    fpr, tpr, _ = roc_curve(all_labels, all_probs)
    roc_auc = auc(fpr, tpr)
    
    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    
    # Print metrics
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    print(f'AUC: {roc_auc:.4f}')
    print(f'Confusion Matrix:\n{cm}')
    
    # Plot ROC curve
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.savefig('roc_curve.png')
    plt.show()
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Not Fractured', 'Fractured'],
                yticklabels=['Not Fractured', 'Fractured'])
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.title('Confusion Matrix')
    plt.savefig('confusion_matrix.png')
    plt.show()
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'auc': roc_auc,
        'confusion_matrix': cm
    }

# Evaluate the model
evaluation_results = evaluate_model(trained_model, val_loader)
