In [None]:
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from PIL import Image
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torchvision.transforms as transforms

# -----------------------------
# 1. Setup and Configuration
# -----------------------------

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Paths
train_csv_path = 'Train.csv'  # Path to your training CSV
images_dir = 'datasets/dataset/images/train'  # Path to your images directory
model_weights_path = 'custom_cnn_model.pth'  # Path to save/load your model weights

# -----------------------------
# 2. Data Preparation
# -----------------------------

# Load train data
train = pd.read_csv(train_csv_path)

# Encode class labels into numerical format using LabelEncoder
label_encoder = LabelEncoder()
train['class'] = label_encoder.fit_transform(train['class'])

# Check for missing image files and filter them out
valid_images = [img_id for img_id in train['Image_ID'] if os.path.exists(os.path.join(images_dir, img_id))]
train = train[train['Image_ID'].isin(valid_images)].reset_index(drop=True)

print(f"Number of training samples after filtering: {len(train)}")

# Split data into training and validation sets (80% train, 20% val)
train_df, val_df = train_test_split(
    train, 
    test_size=0.2, 
    random_state=42, 
    stratify=train['class']
)

print(f"Training samples: {len(train_df)}, Validation samples: {len(val_df)}")

# -----------------------------
# 3. Dataset and DataLoader
# -----------------------------

# Define a Custom Dataset Class for Loading Images and Annotations
class CustomDataset(Dataset):
    def __init__(self, dataframe, images_dir, transforms=None):
        self.dataframe = dataframe
        self.images_dir = images_dir
        self.transforms = transforms if transforms else transforms.Compose([transforms.ToTensor()])

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_id = self.dataframe.iloc[idx]['Image_ID']
        image_path = os.path.join(self.images_dir, image_id)
        
        # Load and preprocess the image
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            return None, None  # Return None if the image cannot be opened
        
        # Extract bounding box coordinates
        bbox = torch.tensor(
            [self.dataframe.iloc[idx][c] for c in ['xmin', 'ymin', 'xmax', 'ymax']], 
            dtype=torch.float32
        )
        
        # Extract class label
        label = torch.tensor(self.dataframe.iloc[idx]['class'], dtype=torch.long)
        
        # Apply transformations
        if self.transforms:
            image = self.transforms(image)
        
        return image, {'boxes': bbox, 'labels': label}

# Define transformations for the images
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to 256x256
    transforms.ToTensor()
])

# Instantiate the training dataset and DataLoader
train_dataset = CustomDataset(dataframe=train_df, images_dir=images_dir, transforms=transform)
train_loader = DataLoader(
    train_dataset, 
    batch_size=32,          # Adjust based on your GPU memory
    shuffle=True,           # Shuffle for training
    num_workers=4,          # Number of subprocesses for data loading
    pin_memory=True         # Copy tensors into CUDA pinned memory
)

# Instantiate the validation dataset and DataLoader
val_dataset = CustomDataset(dataframe=val_df, images_dir=images_dir, transforms=transform)
val_loader = DataLoader(
    val_dataset, 
    batch_size=32,          # Adjust based on your GPU memory
    shuffle=False,          # No need to shuffle for evaluation
    num_workers=4,          # Number of subprocesses for data loading
    pin_memory=True         # Copy tensors into CUDA pinned memory
)

# -----------------------------
# 4. Model Definition
# -----------------------------

# Define the CustomCNN model
class CustomCNN(nn.Module):
    def __init__(self, num_classes):
        super(CustomCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.flattened_size = self._get_flattened_size()
        self.fc1 = nn.Linear(self.flattened_size, 256)
        self.fc2_bbox = nn.Linear(256, 4)      # Bounding box coordinates [xmin, ymin, xmax, ymax]
        self.fc2_class = nn.Linear(256, num_classes)  # Class prediction

    def _get_flattened_size(self):
        dummy_input = torch.zeros(1, 3, 256, 256)
        dummy_output = self._forward_conv(dummy_input)
        return dummy_output.view(1, -1).size(1)

    def _forward_conv(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        return x

    def forward(self, x):
        x = self._forward_conv(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        bbox_output = self.fc2_bbox(x)
        class_output = self.fc2_class(x)
        return bbox_output, class_output

# Define number of classes based on unique labels
num_classes = len(train['class'].unique())

# Instantiate the model and move to the correct device
model = CustomCNN(num_classes=num_classes).to(device)

# Print the model structure (optional)
print(f"Custom CNN Model:\n{model}")

# -----------------------------
# 5. Training Loop
# -----------------------------

# Define loss functions and optimizer
bbox_loss_fn = nn.MSELoss()
class_loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 3
for epoch in range(num_epochs):
    print(f"\nStarting Epoch {epoch + 1}/{num_epochs}")
    model.train()
    total_bbox_loss, total_class_loss = 0.0, 0.0

    for batch_idx, (images, targets) in enumerate(tqdm(train_loader, desc="Training")):
        if images is None or targets is None:
            print(f"Skipping batch {batch_idx + 1} due to None values.")
            continue

        # Move data to the correct device
        images = images.to(device)
        bboxes = targets['boxes'].to(device)
        labels = targets['labels'].to(device)

        # Forward pass
        pred_bboxes, pred_labels = model(images)

        # Calculate losses
        bbox_loss = bbox_loss_fn(pred_bboxes, bboxes)
        class_loss = class_loss_fn(pred_labels, labels)
        total_loss = bbox_loss + class_loss

        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        # Accumulate losses
        total_bbox_loss += bbox_loss.item()
        total_class_loss += class_loss.item()

    avg_bbox_loss = total_bbox_loss / len(train_loader)
    avg_class_loss = total_class_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], BBox Loss: {avg_bbox_loss:.4f}, Class Loss: {avg_class_loss:.4f}')

# Save the trained model
torch.save(model.state_dict(), model_weights_path)
print(f"\nModel saved to '{model_weights_path}'")

# -----------------------------
# 6. Evaluation Metrics
# -----------------------------

def calculate_iou(box1, box2):
    """
    Calculate Intersection over Union (IoU) between two bounding boxes.
    Boxes are in the format [xmin, ymin, xmax, ymax]
    """
    x_left = max(box1[0], box2[0])
    y_top    = max(box1[1], box2[1])
    x_right  = min(box1[2], box2[2])
    y_bottom = min(box1[3], box2[3])

    if x_right < x_left or y_bottom < y_top:
        return 0.0  # No overlap

    intersection_area = (x_right - x_left) * (y_bottom - y_top)
    box1_area = (box1[2]-box1[0]) * (box1[3]-box1[1])
    box2_area = (box2[2]-box2[0]) * (box2[3]-box2[1])
    iou = intersection_area / float(box1_area + box2_area - intersection_area)
    return iou

def evaluate_classification(model, dataloader, device):
    """
    Evaluate classification performance: Accuracy, Precision, Recall, F1-Score, Confusion Matrix
    """
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Evaluating Classification"):
            # Handle possible None batches
            if images is None or targets is None:
                continue

            images = images.to(device)
            labels = targets['labels'].to(device)

            # Forward pass
            _, preds = model(images)  # Assuming model returns (bbox, class)
            preds = torch.argmax(preds, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted', zero_division=0
    )
    conf_matrix = confusion_matrix(all_labels, all_preds)
#
    print("\n--- Classification Evaluation ---")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision (Weighted): {precision:.4f}")
    print(f"Recall (Weighted): {recall:.4f}")
    print(f"F1-Score (Weighted): {f1:.4f}")
    print("Confusion Matrix:")
    print(conf_matrix)

def evaluate_bounding_boxes(model, dataloader, device, iou_threshold=0.5):
    """
    Evaluate bounding box predictions: Mean IoU and Precision at IoU threshold
    """
    model.eval()
    all_iou = []
    matched = 0
    total = 0

    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Evaluating Bounding Boxes"):
            # Handle possible None batches
            if images is None or targets is None:
                continue

            images = images.to(device)
            true_bboxes = targets['boxes'].to(device)

            # Forward pass
            pred_bboxes, _ = model(images)

            # Move tensors to CPU for processing
            pred_bboxes = pred_bboxes.cpu().numpy()
            true_bboxes = true_bboxes.cpu().numpy()

            # Iterate over each prediction and true box
            for pred_box, true_box in zip(pred_bboxes, true_bboxes):
                iou = calculate_iou(pred_box, true_box)
                all_iou.append(iou)
                if iou >= iou_threshold:
                    matched += 1
                total += 1

    # Calculate metrics
    mean_iou = np.mean(all_iou) if all_iou else 0
    precision_at_iou = matched / total if total > 0 else 0

    print("\n--- Bounding Box Evaluation ---")
    print(f"Mean IoU: {mean_iou:.4f}")
    print(f"Precision at IoU >= {iou_threshold}: {precision_at_iou:.4f}")

def visualize_predictions(model, dataloader, device, label_encoder, num_samples=5):
    """
    Visualize a few samples of ground truth and predicted bounding boxes along with class labels
    """
    model.eval()
    samples_visualized = 0

    with torch.no_grad():
        for images, targets in dataloader:
            # Handle possible None batches
            if images is None or targets is None:
                continue

            images = images.to(device)
            true_bboxes = targets['boxes'].to(device)
            true_labels = targets['labels'].to(device)

            # Forward pass
            pred_bboxes, pred_labels = model(images)
            preds = torch.argmax(pred_labels, dim=1)

            # Move tensors to CPU for visualization
            images = images.cpu().numpy()
            true_bboxes = true_bboxes.cpu().numpy()
            preds = preds.cpu().numpy()
            pred_bboxes = pred_bboxes.cpu().numpy()

            for img, true_box, true_label, pred_box, pred_label in zip(
                images, true_bboxes, true_labels, pred_bboxes, preds
            ):
                if samples_visualized >= num_samples:
                    return

                fig, ax = plt.subplots(1)
                img = np.transpose(img, (1, 2, 0))  # Convert from (C, H, W) to (H, W, C)
                ax.imshow(img)

                # Plot Ground Truth Bounding Box
                gt_xmin, gt_ymin, gt_xmax, gt_ymax = true_box
                gt_width = gt_xmax - gt_xmin
                gt_height = gt_ymax - gt_ymin
                gt_rect = patches.Rectangle(
                    (gt_xmin, gt_ymin), gt_width, gt_height, 
                    linewidth=2, edgecolor='g', facecolor='none', label='Ground Truth'
                )
                ax.add_patch(gt_rect)

                # Plot Predicted Bounding Box
                pred_xmin, pred_ymin, pred_xmax, pred_ymax = pred_box
                pred_width = pred_xmax - pred_xmin
                pred_height = pred_ymax - pred_ymin
                pred_rect = patches.Rectangle(
                    (pred_xmin, pred_ymin), pred_width, pred_height, 
                    linewidth=2, edgecolor='r', facecolor='none', label='Prediction'
                )
                ax.add_patch(pred_rect)

                # Add Labels
                gt_class = label_encoder.inverse_transform([true_label])[0]
                pred_class = label_encoder.inverse_transform([pred_label])[0]
                plt.title(f"GT: {gt_class} | Pred: {pred_class}")

                # Create Legend
                handles = [
                    patches.Patch(color='g', label='Ground Truth'),
                    patches.Patch(color='r', label='Prediction')
                ]
                plt.legend(handles=handles)

                plt.show()

                samples_visualized += 1

# -----------------------------
# 7. Model Evaluation
# -----------------------------

# Load the trained model weights (already saved in 'custom_cnn_model.pth')
if os.path.exists(model_weights_path):
    model.load_state_dict(torch.load(model_weights_path, map_location=device))
    model.eval()  # Set model to evaluation mode
    print(f"\nModel loaded from '{model_weights_path}' and set to evaluation mode.")
else:
    print(f"\nModel weights file '{model_weights_path}' not found. Please ensure the file exists.")
    # Optionally, exit the script or proceed without evaluation
    exit()

# Evaluate Classification
evaluate_classification(model, val_loader, device)

# Evaluate Bounding Boxes
evaluate_bounding_boxes(model, val_loader, device, iou_threshold=0.5)

# Visualize Predictions (Optional)
visualize_predictions(model, val_loader, device, label_encoder, num_samples=5)


In [None]:
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from PIL import Image
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torchvision.transforms as transforms

# -----------------------------
# 1. Setup and Configuration
# -----------------------------

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Paths
train_csv_path = 'Train.csv'  # Path to your training CSV
images_dir = 'datasets/dataset/images/train'  # Path to your images directory
model_weights_path = 'custom_cnn_model.pth'  # Path to save/load your model weights

# -----------------------------
# 2. Data Preparation
# -----------------------------

# Load train data
train = pd.read_csv(train_csv_path)

# Encode class labels into numerical format using LabelEncoder
label_encoder = LabelEncoder()
train['class'] = label_encoder.fit_transform(train['class'])

# Check for missing image files and filter them out
valid_images = [img_id for img_id in train['Image_ID'] if os.path.exists(os.path.join(images_dir, img_id))]
train = train[train['Image_ID'].isin(valid_images)].reset_index(drop=True)

print(f"Number of training samples after filtering: {len(train)}")

# Split data into training and validation sets (80% train, 20% val)
train_df, val_df = train_test_split(
    train, 
    test_size=0.2, 
    random_state=42, 
    stratify=train['class']
)

print(f"Training samples: {len(train_df)}, Validation samples: {len(val_df)}")

# -----------------------------
# 3. Dataset and DataLoader
# -----------------------------

# Define a Custom Dataset Class for Loading Images and Annotations
class CustomDataset(Dataset):
    def __init__(self, dataframe, images_dir, transforms=None):
        self.dataframe = dataframe
        self.images_dir = images_dir
        self.transforms = transforms if transforms else transforms.Compose([transforms.ToTensor()])

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_id = self.dataframe.iloc[idx]['Image_ID']
        image_path = os.path.join(self.images_dir, image_id)
        
        # Load and preprocess the image
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            raise RuntimeError(f"Error loading image {image_path}: {e}")
        
        # Extract bounding box coordinates
        bbox = torch.tensor(
            [self.dataframe.iloc[idx][c] for c in ['xmin', 'ymin', 'xmax', 'ymax']], 
            dtype=torch.float32
        )
        
        # Extract class label
        label = torch.tensor(self.dataframe.iloc[idx]['class'], dtype=torch.long)
        
        # Apply transformations
        if self.transforms:
            image = self.transforms(image)
        
        return image, {'boxes': bbox, 'labels': label}

# Define transformations for the images
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to 256x256
    transforms.ToTensor()
])

# Instantiate the training dataset and DataLoader
train_dataset = CustomDataset(dataframe=train_df, images_dir=images_dir, transforms=transform)
train_loader = DataLoader(
    train_dataset, 
    batch_size=16,          # Adjust based on your GPU memory
    shuffle=True,           # Shuffle for training
    num_workers=0,          # Set to 0 for easier debugging
    pin_memory=True         # Copy tensors into CUDA pinned memory
)

# Instantiate the validation dataset and DataLoader
val_dataset = CustomDataset(dataframe=val_df, images_dir=images_dir, transforms=transform)
val_loader = DataLoader(
    val_dataset, 
    batch_size=16,          # Adjust based on your GPU memory
    shuffle=False,          # No need to shuffle for evaluation
    num_workers=0,          # Set to 0 for easier debugging
    pin_memory=True         # Copy tensors into CUDA pinned memory
)

# -----------------------------
# 4. Model Definition
# -----------------------------

# Define the CustomCNN model
class CustomCNN(nn.Module):
    def __init__(self, num_classes):
        super(CustomCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.flattened_size = self._get_flattened_size()
        self.fc1 = nn.Linear(self.flattened_size, 256)
        self.fc2_bbox = nn.Linear(256, 4)      # Bounding box coordinates [xmin, ymin, xmax, ymax]
        self.fc2_class = nn.Linear(256, num_classes)  # Class prediction

    def _get_flattened_size(self):
        dummy_input = torch.zeros(1, 3, 256, 256)
        dummy_output = self._forward_conv(dummy_input)
        return dummy_output.view(1, -1).size(1)

    def _forward_conv(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        return x

    def forward(self, x):
        x = self._forward_conv(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        bbox_output = self.fc2_bbox(x)
        class_output = self.fc2_class(x)
        return bbox_output, class_output

# Define number of classes based on unique labels
num_classes = len(train['class'].unique())

# Instantiate the model and move to the correct device
model = CustomCNN(num_classes=num_classes).to(device)

# Print the model structure (optional)
print(f"Custom CNN Model:\n{model}")

# -----------------------------
# 5. Training Loop
# -----------------------------

# Define loss functions and optimizer
bbox_loss_fn = nn.MSELoss()
class_loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 1
for epoch in range(num_epochs):
    print(f"\nStarting Epoch {epoch + 1}/{num_epochs}")
    model.train()
    total_bbox_loss, total_class_loss = 0.0, 0.0

    for batch_idx, (images, targets) in enumerate(tqdm(train_loader, desc="Training")):
        # Skip batches where images or targets are None
        if images is None or targets is None:
            print(f"Skipping batch {batch_idx + 1} due to None values.")
            continue

        # Move data to the correct device
        images = images.to(device)
        bboxes = targets['boxes'].to(device)
        labels = targets['labels'].to(device)

        # Forward pass
        pred_bboxes, pred_labels = model(images)

        # Calculate losses
        bbox_loss = bbox_loss_fn(pred_bboxes, bboxes)
        class_loss = class_loss_fn(pred_labels, labels)
        total_loss = bbox_loss + class_loss

        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        # Accumulate losses
        total_bbox_loss += bbox_loss.item()
        total_class_loss += class_loss.item()

    avg_bbox_loss = total_bbox_loss / len(train_loader)
    avg_class_loss = total_class_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], BBox Loss: {avg_bbox_loss:.4f}, Class Loss: {avg_class_loss:.4f}')

# Save the trained model
torch.save(model.state_dict(), model_weights_path)
print(f"\nModel saved to '{model_weights_path}'")

# -----------------------------
# 6. Evaluation Metrics
# -----------------------------

def calculate_iou(box1, box2):
    """
    Calculate Intersection over Union (IoU) between two bounding boxes.
    Boxes are in the format [xmin, ymin, xmax, ymax]
    """
    x_left = max(box1[0], box2[0])
    y_top    = max(box1[1], box2[1])
    x_right  = min(box1[2], box2[2])
    y_bottom = min(box1[3], box2[3])

    if x_right < x_left or y_bottom < y_top:
        return 0.0  # No overlap

    intersection_area = (x_right - x_left) * (y_bottom - y_top)
    box1_area = (box1[2]-box1[0]) * (box1[3]-box1[1])
    box2_area = (box2[2]-box2[0]) * (box2[3]-box2[1])
    iou = intersection_area / float(box1_area + box2_area - intersection_area)
    return iou

def evaluate_classification(model, dataloader, device):
    """
    Evaluate classification performance: Accuracy, Precision, Recall, F1-Score, Confusion Matrix
    """
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Evaluating Classification"):
            # Skip batches where images or targets are None
            if images is None or targets is None:
                continue

            images = images.to(device)
            labels = targets['labels'].to(device)

            # Forward pass
            _, preds = model(images)  # Assuming model returns (bbox, class)
            preds = torch.argmax(preds, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted', zero_division=0
    )
    conf_matrix = confusion_matrix(all_labels, all_preds)

    print("\n--- Classification Evaluation ---")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision (Weighted): {precision:.4f}")
    print(f"Recall (Weighted): {recall:.4f}")
    print(f"F1-Score (Weighted): {f1:.4f}")
    print("Confusion Matrix:")
    print(conf_matrix)

def evaluate_bounding_boxes(model, dataloader, device, iou_threshold=0.5):
    """
    Evaluate bounding box predictions: Mean IoU and Precision at IoU threshold
    """
    model.eval()
    all_iou = []
    matched = 0
    total = 0

    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Evaluating Bounding Boxes"):
            # Skip batches where images or targets are None
            if images is None or targets is None:
                continue

            images = images.to(device)
            true_bboxes = targets['boxes'].to(device)

            # Forward pass
            pred_bboxes, _ = model(images)

            # Move tensors to CPU for processing
            pred_bboxes = pred_bboxes.cpu().numpy()
            true_bboxes = true_bboxes.cpu().numpy()

            # Iterate over each prediction and true box
            for pred_box, true_box in zip(pred_bboxes, true_bboxes):
                iou = calculate_iou(pred_box, true_box)
                all_iou.append(iou)
                if iou >= iou_threshold:
                    matched += 1
                total += 1

    # Calculate metrics
    mean_iou = np.mean(all_iou) if all_iou else 0
    precision_at_iou = matched / total if total > 0 else 0

    print("\n--- Bounding Box Evaluation ---")
    print(f"Mean IoU: {mean_iou:.4f}")
    print(f"Precision at IoU >= {iou_threshold}: {precision_at_iou:.4f}")

def visualize_predictions(model, dataloader, device, label_encoder, num_samples=5):
    """
    Visualize a few samples of ground truth and predicted bounding boxes along with class labels
    """
    model.eval()
    samples_visualized = 0

    with torch.no_grad():
        for images, targets in dataloader:
            # Skip batches where images or targets are None
            if images is None or targets is None:
                continue

            images = images.to(device)
            true_bboxes = targets['boxes'].to(device)
            true_labels = targets['labels'].to(device)

            # Forward pass
            pred_bboxes, pred_labels = model(images)
            preds = torch.argmax(pred_labels, dim=1)

            # Move tensors to CPU for visualization
            images = images.cpu().numpy()
            true_bboxes = true_bboxes.cpu().numpy()
            preds = preds.cpu().numpy()
            pred_bboxes = pred_bboxes.cpu().numpy()

            for img, true_box, true_label, pred_box, pred_label in zip(
                images, true_bboxes, true_labels, pred_bboxes, preds
            ):
                if samples_visualized >= num_samples:
                    return

                fig, ax = plt.subplots(1)
                img = np.transpose(img, (1, 2, 0))  # Convert from (C, H, W) to (H, W, C)
                ax.imshow(img)

                # Plot Ground Truth Bounding Box
                gt_xmin, gt_ymin, gt_xmax, gt_ymax = true_box
                gt_width = gt_xmax - gt_xmin
                gt_height = gt_ymax - gt_ymin
                gt_rect = patches.Rectangle(
                    (gt_xmin, gt_ymin), gt_width, gt_height, 
                    linewidth=2, edgecolor='g', facecolor='none', label='Ground Truth'
                )
                ax.add_patch(gt_rect)

                # Plot Predicted Bounding Box
                pred_xmin, pred_ymin, pred_xmax, pred_ymax = pred_box
                pred_width = pred_xmax - pred_xmin
                pred_height = pred_ymax - pred_ymin
                pred_rect = patches.Rectangle(
                    (pred_xmin, pred_ymin), pred_width, pred_height, 
                    linewidth=2, edgecolor='r', facecolor='none', label='Prediction'
                )
                ax.add_patch(pred_rect)

                # Add Labels
                gt_class = label_encoder.inverse_transform([true_label])[0]
                pred_class = label_encoder.inverse_transform([pred_label])[0]
                plt.title(f"GT: {gt_class} | Pred: {pred_class}")

                # Create Legend
                handles = [
                    patches.Patch(color='g', label='Ground Truth'),
                    patches.Patch(color='r', label='Prediction')
                ]
                plt.legend(handles=handles)

                plt.show()

                samples_visualized += 1

# -----------------------------
# 7. Model Evaluation
# -----------------------------

# Load the trained model weights (already saved in 'custom_cnn_model.pth')
if os.path.exists(model_weights_path):
    try:
        model.load_state_dict(torch.load(model_weights_path, map_location=device))
        model.eval()  # Set model to evaluation mode
        print(f"\nModel loaded from '{model_weights_path}' and set to evaluation mode.")
    except Exception as e:
        print(f"\nError loading model weights: {e}")
        exit()
else:
    print(f"\nModel weights file '{model_weights_path}' not found. Please ensure the file exists.")
    exit()

# Evaluate Classification
evaluate_classification(model, val_loader, device)

# Evaluate Bounding Boxes
evaluate_bounding_boxes(model, val_loader, device, iou_threshold=0.5)

# Visualize Predictions (Optional)
visualize_predictions(model, val_loader, device, label_encoder, num_samples=5)


In [None]:
# import pandas as pd
# import os
# import torch
# import torch.nn as nn
# import torch.optim as optim
# from torch.utils.data import Dataset, DataLoader
# import torch.nn.functional as F
# from PIL import Image
# from tqdm import tqdm
# from sklearn.preprocessing import LabelEncoder
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
# import numpy as np
# import matplotlib.pyplot as plt
# import matplotlib.patches as patches
# import torchvision.transforms as transforms

# # -----------------------------
# # 1. Setup and Configuration
# # -----------------------------

# # Set device to GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print(f"Using device: {device}")

# # Paths
# train_csv_path = 'Train.csv'  # Path to your training CSV
# images_dir = 'datasets/dataset/images/compressed/train'  # Path to your images directory
# model_weights_path = 'custom_cnn_model.pth'  # Path to save/load your model weights

# # -----------------------------
# # 2. Data Preparation
# # -----------------------------

# # Load train data
# train = pd.read_csv(train_csv_path)

# # Encode class labels into numerical format using LabelEncoder
# label_encoder = LabelEncoder()
# train['class'] = label_encoder.fit_transform(train['class'])

# # Check for missing image files and filter them out
# valid_images = [img_id for img_id in train['Image_ID'] if os.path.exists(os.path.join(images_dir, img_id))]
# train = train[train['Image_ID'].isin(valid_images)].reset_index(drop=True)

# print(f"Number of training samples after filtering: {len(train)}")

# # Split data into training and validation sets (80% train, 20% val)
# train_df, val_df = train_test_split(
#     train, 
#     test_size=0.2, 
#     random_state=42, 
#     stratify=train['class']
# )

# print(f"Training samples: {len(train_df)}, Validation samples: {len(val_df)}")

# # -----------------------------
# # 3. Dataset and DataLoader
# # -----------------------------

# # Define a Custom Dataset Class for Loading Images and Annotations
# class CustomDataset(Dataset):
#     def __init__(self, dataframe, images_dir, transforms=None):
#         self.dataframe = dataframe
#         self.images_dir = images_dir
#         self.transforms = transforms if transforms else transforms.Compose([transforms.ToTensor()])

#     def __len__(self):
#         return len(self.dataframe)

#     def __getitem__(self, idx):
#         image_id = self.dataframe.iloc[idx]['Image_ID']
#         image_path = os.path.join(self.images_dir, image_id)
        
#         # Load and preprocess the image
#         try:
#             image = Image.open(image_path).convert("RGB")
#         except Exception as e:
#             raise RuntimeError(f"Error loading image {image_path}: {e}")
        
#         # Extract bounding box coordinates
#         bbox = torch.tensor(
#             [self.dataframe.iloc[idx][c] for c in ['xmin', 'ymin', 'xmax', 'ymax']], 
#             dtype=torch.float32
#         )
        
#         # Extract class label
#         label = torch.tensor(self.dataframe.iloc[idx]['class'], dtype=torch.long)
        
#         # Apply transformations
#         if self.transforms:
#             image = self.transforms(image)
        
#         return image, {'boxes': bbox, 'labels': label}

# # Define transformations for the images
# transform = transforms.Compose([
#     transforms.Resize((256, 256)),  # Resize images to 256x256
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet mean
#                          std=[0.229, 0.224, 0.225])   # ImageNet std
# ])

# # Instantiate the training dataset and DataLoader
# train_dataset = CustomDataset(dataframe=train_df, images_dir=images_dir, transforms=transform)
# train_loader = DataLoader(
#     train_dataset, 
#     batch_size=32,          # Adjust based on your GPU memory
#     shuffle=True,           # Shuffle for training
#     num_workers=0,          # Set to 0 for easier debugging
#     pin_memory=True         # Copy tensors into CUDA pinned memory
# )

# # Instantiate the validation dataset and DataLoader
# val_dataset = CustomDataset(dataframe=val_df, images_dir=images_dir, transforms=transform)
# val_loader = DataLoader(
#     val_dataset, 
#     batch_size=32,          # Adjust based on your GPU memory
#     shuffle=False,          # No need to shuffle for evaluation
#     num_workers=0,          # Set to 0 for easier debugging
#     pin_memory=True         # Copy tensors into CUDA pinned memory
# )

# # -----------------------------
# # 4. Model Definition
# # -----------------------------

# # Define the CustomCNN model
# class CustomCNN(nn.Module):
#     def __init__(self, num_classes):
#         super(CustomCNN, self).__init__()
#         self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
#         self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
#         self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
#         self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
#         self.flattened_size = self._get_flattened_size()
#         self.fc1 = nn.Linear(self.flattened_size, 256)
#         self.fc2_bbox = nn.Linear(256, 4)      # Bounding box coordinates [xmin, ymin, xmax, ymax]
#         self.fc2_class = nn.Linear(256, num_classes)  # Class prediction

#     def _get_flattened_size(self):
#         dummy_input = torch.zeros(1, 3, 256, 256)
#         dummy_output = self._forward_conv(dummy_input)
#         return dummy_output.view(1, -1).size(1)

#     def _forward_conv(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.pool(F.relu(self.conv2(x)))
#         x = self.pool(F.relu(self.conv3(x)))
#         return x

#     def forward(self, x):
#         x = self._forward_conv(x)
#         x = x.view(x.size(0), -1)
#         x = F.relu(self.fc1(x))
#         bbox_output = self.fc2_bbox(x)
#         class_output = self.fc2_class(x)
#         return bbox_output, class_output

# # Define number of classes based on unique labels
# num_classes = len(train['class'].unique())

# # Instantiate the model and move to the correct device
# model = CustomCNN(num_classes=num_classes).to(device)

# # Print the model structure (optional)
# print(f"Custom CNN Model:\n{model}")

# # -----------------------------
# # 5. Training Loop
# # -----------------------------

# # Define loss functions and optimizer

# # IoU Loss Function
# class IoULoss(nn.Module):
#     def __init__(self):
#         super(IoULoss, self).__init__()

#     def forward(self, pred_boxes, target_boxes):
#         # Convert boxes from (xmin, ymin, xmax, ymax) to (x1, y1, x2, y2)
#         pred_boxes = torch.sigmoid(pred_boxes)  # Ensure predictions are between 0 and 1
#         target_boxes = torch.sigmoid(target_boxes)
        
#         # Intersection coordinates
#         x1 = torch.max(pred_boxes[:, 0], target_boxes[:, 0])
#         y1 = torch.max(pred_boxes[:, 1], target_boxes[:, 1])
#         x2 = torch.min(pred_boxes[:, 2], target_boxes[:, 2])
#         y2 = torch.min(pred_boxes[:, 3], target_boxes[:, 3])
        
#         # Intersection area
#         intersection = (x2 - x1).clamp(min=0) * (y2 - y1).clamp(min=0)
        
#         # Areas of the boxes
#         pred_area = (pred_boxes[:, 2] - pred_boxes[:, 0]).clamp(min=0) * \
#                     (pred_boxes[:, 3] - pred_boxes[:, 1]).clamp(min=0)
#         target_area = (target_boxes[:, 2] - target_boxes[:, 0]).clamp(min=0) * \
#                       (target_boxes[:, 3] - target_boxes[:, 1]).clamp(min=0)
        
#         # Union area
#         union = pred_area + target_area - intersection + 1e-6  # Add epsilon to prevent division by zero
        
#         # IoU
#         iou = intersection / union
        
#         # IoU Loss
#         loss = 1 - iou
#         return loss.mean()

# # Initialize IoU Loss
# iou_loss_fn = IoULoss()

# # Cross Entropy Loss for classification
# class_loss_fn = nn.CrossEntropyLoss()

# # Optimizer
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# # Learning Rate Scheduler (Optional)
# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# # Training loop with validation
# num_epochs = 1  # Increased from 1 to 5

# for epoch in range(num_epochs):
#     print(f"\n=== Epoch {epoch + 1}/{num_epochs} ===")
#     model.train()
#     total_iou_loss, total_class_loss = 0.0, 0.0
#     train_loader_iter = tqdm(train_loader, desc="Training", leave=False)
    
#     for batch_idx, (images, targets) in enumerate(train_loader_iter):
#         # Skip batches where images or targets are None
#         if images is None or targets is None:
#             print(f"Skipping batch {batch_idx + 1} due to None values.")
#             continue

#         # Move data to the correct device
#         images = images.to(device)
#         bboxes = targets['boxes'].to(device)
#         labels = targets['labels'].to(device)

#         # Forward pass
#         pred_bboxes, pred_labels = model(images)

#         # Calculate losses
#         bbox_loss = iou_loss_fn(pred_bboxes, bboxes)
#         class_loss = class_loss_fn(pred_labels, labels)
#         total_loss = bbox_loss + class_loss

#         # Backward pass and optimization
#         optimizer.zero_grad()
#         total_loss.backward()
#         optimizer.step()

#         # Accumulate losses
#         total_iou_loss += bbox_loss.item()
#         total_class_loss += class_loss.item()

#         # Update progress bar
#         train_loader_iter.set_postfix({'IoU Loss': bbox_loss.item(), 'Class Loss': class_loss.item()})

#     avg_iou_loss = total_iou_loss / len(train_loader)
#     avg_class_loss = total_class_loss / len(train_loader)
#     print(f"Training Losses -> IoU Loss: {avg_iou_loss:.4f}, Class Loss: {avg_class_loss:.4f}")

#     # Step the scheduler
#     scheduler.step()

#     # -----------------------------
#     # 6. Validation After Each Epoch
#     # -----------------------------

#     def calculate_iou(box1, box2):
#         """
#         Calculate Intersection over Union (IoU) between two bounding boxes.
#         Boxes are in the format [xmin, ymin, xmax, ymax]
#         """
#         x_left = max(box1[0], box2[0])
#         y_top    = max(box1[1], box2[1])
#         x_right  = min(box1[2], box2[2])
#         y_bottom = min(box1[3], box2[3])

#         if x_right < x_left or y_bottom < y_top:
#             return 0.0  # No overlap

#         intersection_area = (x_right - x_left) * (y_bottom - y_top)
#         box1_area = (box1[2]-box1[0]) * (box1[3]-box1[1])
#         box2_area = (box2[2]-box2[0]) * (box2[3]-box2[1])
#         iou = intersection_area / float(box1_area + box2_area - intersection_area)
#         return iou

#     def evaluate_classification(model, dataloader, device):
#         """
#         Evaluate classification performance: Accuracy, Precision, Recall, F1-Score, Confusion Matrix
#         """
#         model.eval()
#         all_preds = []
#         all_labels = []

#         with torch.no_grad():
#             for images, targets in tqdm(dataloader, desc="Validation Classification", leave=False):
#                 # Skip batches where images or targets are None
#                 if images is None or targets is None:
#                     continue

#                 images = images.to(device)
#                 labels = targets['labels'].to(device)

#                 # Forward pass
#                 _, preds = model(images)  # Assuming model returns (bbox, class)
#                 preds = torch.argmax(preds, dim=1)

#                 all_preds.extend(preds.cpu().numpy())
#                 all_labels.extend(labels.cpu().numpy())

#         # Calculate metrics
#         accuracy = accuracy_score(all_labels, all_preds)
#         precision, recall, f1, _ = precision_recall_fscore_support(
#             all_labels, all_preds, average='weighted', zero_division=0
#         )
#         conf_matrix = confusion_matrix(all_labels, all_preds)

#         print("\n--- Classification Evaluation ---")
#         print(f"Accuracy: {accuracy:.4f}")
#         print(f"Precision (Weighted): {precision:.4f}")
#         print(f"Recall (Weighted): {recall:.4f}")
#         print(f"F1-Score (Weighted): {f1:.4f}")
#         print("Confusion Matrix:")
#         print(conf_matrix)

#     def evaluate_bounding_boxes(model, dataloader, device, iou_threshold=0.5):
#         """
#         Evaluate bounding box predictions: Mean IoU and Precision at IoU threshold
#         """
#         model.eval()
#         all_iou = []
#         matched = 0
#         total = 0

#         with torch.no_grad():
#             for images, targets in tqdm(dataloader, desc="Validation Bounding Boxes", leave=False):
#                 # Skip batches where images or targets are None
#                 if images is None or targets is None:
#                     continue

#                 images = images.to(device)
#                 true_bboxes = targets['boxes'].to(device)

#                 # Forward pass
#                 pred_bboxes, _ = model(images)

#                 # Move tensors to CPU for processing
#                 pred_bboxes = pred_bboxes.cpu().numpy()
#                 true_bboxes = true_bboxes.cpu().numpy()

#                 # Iterate over each prediction and true box
#                 for pred_box, true_box in zip(pred_bboxes, true_bboxes):
#                     iou = calculate_iou(pred_box, true_box)
#                     all_iou.append(iou)
#                     if iou >= iou_threshold:
#                         matched += 1
#                     total += 1

#         # Calculate metrics
#         mean_iou = np.mean(all_iou) if all_iou else 0
#         precision_at_iou = matched / total if total > 0 else 0

#         print("\n--- Bounding Box Evaluation ---")
#         print(f"Mean IoU: {mean_iou:.4f}")
#         print(f"Precision at IoU >= {iou_threshold}: {precision_at_iou:.4f}")

#     # Perform validation
#     print(f"\n=== Validation After Epoch {epoch + 1} ===")
#     evaluate_classification(model, val_loader, device)
#     evaluate_bounding_boxes(model, val_loader, device, iou_threshold=0.5)

#     # -----------------------------
#     # 7. Save the Trained Model
#     # -----------------------------

#     # Save the trained model after each epoch
#     checkpoint_path = f'custom_cnn_model_epoch_{epoch + 1}.pth'
#     torch.save(model.state_dict(), checkpoint_path)
#     print(f"Checkpoint saved to '{checkpoint_path}'")

# # -----------------------------
# # 8. Final Evaluation (Optional)
# # -----------------------------

# # Load the best model (if saved separately)
# # model.load_state_dict(torch.load(best_model_path, map_location=device))
# # model.eval()

# # -----------------------------
# # 9. Visualization of Predictions (Optional)
# # -----------------------------

# def visualize_predictions(model, dataloader, device, label_encoder, num_samples=5):
#     """
#     Visualize a few samples of ground truth and predicted bounding boxes along with class labels
#     """
#     model.eval()
#     samples_visualized = 0

#     with torch.no_grad():
#         for images, targets in dataloader:
#             # Skip batches where images or targets are None
#             if images is None or targets is None:
#                 continue

#             images = images.to(device)
#             true_bboxes = targets['boxes'].to(device)
#             true_labels = targets['labels'].to(device)

#             # Forward pass
#             pred_bboxes, pred_labels = model(images)
#             preds = torch.argmax(pred_labels, dim=1)

#             # Move tensors to CPU for visualization
#             images = images.cpu().numpy()
#             true_bboxes = true_bboxes.cpu().numpy()
#             preds = preds.cpu().numpy()
#             pred_bboxes = pred_bboxes.cpu().numpy()

#             for img, true_box, true_label, pred_box, pred_label in zip(
#                 images, true_bboxes, true_labels, pred_bboxes, preds
#             ):
#                 if samples_visualized >= num_samples:
#                     return

#                 fig, ax = plt.subplots(1)
#                 img = np.transpose(img, (1, 2, 0))  # Convert from (C, H, W) to (H, W, C)
#                 # Unnormalize the image for visualization
#                 img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
#                 img = np.clip(img, 0, 1)
#                 ax.imshow(img)

#                 # Plot Ground Truth Bounding Box
#                 gt_xmin, gt_ymin, gt_xmax, gt_ymax = true_box
#                 gt_width = gt_xmax - gt_xmin
#                 gt_height = gt_ymax - gt_ymin
#                 gt_rect = patches.Rectangle(
#                     (gt_xmin, gt_ymin), gt_width, gt_height, 
#                     linewidth=2, edgecolor='g', facecolor='none', label='Ground Truth'
#                 )
#                 ax.add_patch(gt_rect)

#                 # Plot Predicted Bounding Box
#                 pred_xmin, pred_ymin, pred_xmax, pred_ymax = pred_box
#                 pred_width = pred_xmax - pred_xmin
#                 pred_height = pred_ymax - pred_ymin
#                 pred_rect = patches.Rectangle(
#                     (pred_xmin, pred_ymin), pred_width, pred_height, 
#                     linewidth=2, edgecolor='r', facecolor='none', label='Prediction'
#                 )
#                 ax.add_patch(pred_rect)

#                 # Add Labels
#                 gt_class = label_encoder.inverse_transform([true_label])[0]
#                 pred_class = label_encoder.inverse_transform([pred_label])[0]
#                 plt.title(f"GT: {gt_class} | Pred: {pred_class}")

#                 # Create Legend
#                 handles = [
#                     patches.Patch(color='g', label='Ground Truth'),
#                     patches.Patch(color='r', label='Prediction')
#                 ]
#                 plt.legend(handles=handles)

#                 plt.show()

#                 samples_visualized += 1

# # Visualize some predictions after training
# print("\n=== Visualizing Predictions on Validation Set ===")
# visualize_predictions(model, val_loader, device, label_encoder, num_samples=5)

import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from PIL import Image
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torchvision.transforms as transforms

# -----------------------------
# 1. Setup and Configuration
# -----------------------------

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Paths
train_csv_path = 'Train.csv'  # Path to your training CSV
images_dir = 'datasets/dataset/images/compressed/train'  # Path to your images directory
model_weights_path = 'custom_cnn_model.pth'  # Path to save/load your model weights

# -----------------------------
# 2. Data Preparation
# -----------------------------

# Load train data
train = pd.read_csv(train_csv_path)

# Encode class labels into numerical format using LabelEncoder
label_encoder = LabelEncoder()
train['class'] = label_encoder.fit_transform(train['class'])

# Check for missing image files and filter them out
valid_images = [img_id for img_id in train['Image_ID'] if os.path.exists(os.path.join(images_dir, img_id))]
train = train[train['Image_ID'].isin(valid_images)].reset_index(drop=True)

print(f"Number of training samples after filtering: {len(train)}")

# Split data into training and validation sets (80% train, 20% val)
train_df, val_df = train_test_split(
    train, 
    test_size=0.2, 
    random_state=42, 
    stratify=train['class']
)

print(f"Training samples: {len(train_df)}, Validation samples: {len(val_df)}")

# -----------------------------
# 3. Dataset and DataLoader
# -----------------------------

# Define a Custom Dataset Class for Loading Images and Annotations
class CustomDataset(Dataset):
    def __init__(self, dataframe, images_dir, transforms=None):
        self.dataframe = dataframe
        self.images_dir = images_dir
        self.transforms = transforms if transforms else transforms.Compose([transforms.ToTensor()])

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_id = self.dataframe.iloc[idx]['Image_ID']
        image_path = os.path.join(self.images_dir, image_id)
        
        # Load and preprocess the image
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            raise RuntimeError(f"Error loading image {image_path}: {e}")
        
        # Extract bounding box coordinates
        bbox = torch.tensor(
            [self.dataframe.iloc[idx][c] for c in ['xmin', 'ymin', 'xmax', 'ymax']], 
            dtype=torch.float32
        )
        
        # Extract class label
        label = torch.tensor(self.dataframe.iloc[idx]['class'], dtype=torch.long)
        
        # Apply transformations
        if self.transforms:
            image = self.transforms(image)
        
        return image, {'boxes': bbox, 'labels': label}

# Define transformations for the images
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to 256x256
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet mean
                         std=[0.229, 0.224, 0.225])   # ImageNet std
])

# Instantiate the training dataset and DataLoader
train_dataset = CustomDataset(dataframe=train_df, images_dir=images_dir, transforms=transform)
train_loader = DataLoader(
    train_dataset, 
    batch_size=64,          # Adjust based on your GPU memory
    shuffle=True,           # Shuffle for training
    num_workers=0,          # Set to 0 for easier debugging
    pin_memory=True         # Copy tensors into CUDA pinned memory
)

# Instantiate the validation dataset and DataLoader
val_dataset = CustomDataset(dataframe=val_df, images_dir=images_dir, transforms=transform)
val_loader = DataLoader(
    val_dataset, 
    batch_size=64,          # Adjust based on your GPU memory
    shuffle=False,          # No need to shuffle for evaluation
    num_workers=0,          # Set to 0 for easier debugging
    pin_memory=True         # Copy tensors into CUDA pinned memory
)

# -----------------------------
# 4. Reduced Size Model Definition
# -----------------------------

# # Define the smaller CustomCNN model
# class SmallCustomCNN(nn.Module):
#     def __init__(self, num_classes):
#         super(SmallCustomCNN, self).__init__()
#         # Reduced number of filters
#         self.conv1 = nn.Conv2d(3, 8, kernel_size=3, padding=1)
#         self.conv2 = nn.Conv2d(8, 16, kernel_size=3, padding=1)
#         self.conv3 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
#         self.pool = nn.MaxPool2d(2, 2)
#         # Global Average Pooling to reduce parameters
#         self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
#         # Reduced fully connected layers
#         self.fc1 = nn.Linear(32, 64)
#         self.fc2_bbox = nn.Linear(64, 4)      # Bounding box coordinates [xmin, ymin, xmax, ymax]
#         self.fc2_class = nn.Linear(64, num_classes)  # Class prediction

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))  # Output size: (batch, 8, H/2, W/2)
#         x = self.pool(F.relu(self.conv2(x)))  # Output size: (batch, 16, H/4, W/4)
#         x = self.pool(F.relu(self.conv3(x)))  # Output size: (batch, 32, H/8, W/8)
#         x = self.global_pool(x)               # Output size: (batch, 32, 1, 1)
#         x = x.view(x.size(0), -1)             # Flatten to (batch, 32)
#         x = F.relu(self.fc1(x))               # (batch, 64)
#         bbox_output = self.fc2_bbox(x)
#         class_output = self.fc2_class(x)
#         return bbox_output, class_output

# import torch
# import torch.nn as nn
# import torch.nn.functional as F

class SmallCustomCNN(nn.Module):
    def __init__(self, num_classes, input_size):
        """
        Args:
            num_classes (int): Number of target classes for classification.
            input_size (int): The size of the flattened input image (e.g., 3*224*224 for RGB images of size 224x224).
        """
        super(SmallCustomCNN, self).__init__()
        # Fully connected layers
        self.fc1 = nn.Linear(input_size, 512)    # First hidden layer
        self.fc2 = nn.Linear(512, 256)           # Second hidden layer
        self.fc3 = nn.Linear(256, 128)           # Third hidden layer
        self.fc4_bbox = nn.Linear(128, 4)        # Output layer for bounding box coordinates [xmin, ymin, xmax, ymax]
        self.fc4_class = nn.Linear(128, num_classes)  # Output layer for class predictions

        # Optional: Add dropout layers to prevent overfitting
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):
        """
        Defines the forward pass of the network.
        
        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, channels, height, width).
        
        Returns:
            bbox_output (torch.Tensor): Bounding box coordinates.
            class_output (torch.Tensor): Class scores.
        """
        # Flatten the input tensor
        x = x.view(x.size(0), -1)  # Shape: (batch_size, input_size)
        
        # Pass through the first fully connected layer with ReLU activation
        x = F.relu(self.fc1(x))    # Shape: (batch_size, 512)
        x = self.dropout(x)         # Apply dropout
        
        # Pass through the second fully connected layer with ReLU activation
        x = F.relu(self.fc2(x))    # Shape: (batch_size, 256)
        x = self.dropout(x)         # Apply dropout
        
        # Pass through the third fully connected layer with ReLU activation
        x = F.relu(self.fc3(x))    # Shape: (batch_size, 128)
        x = self.dropout(x)         # Apply dropout
        
        # Output layers
        bbox_output = self.fc4_bbox(x)   # Shape: (batch_size, 4)
        class_output = self.fc4_class(x) # Shape: (batch_size, num_classes)
        
        return bbox_output, class_output


# Define number of classes based on unique labels
num_classes = len(train['class'].unique())

# Instantiate the smaller model and move to the correct device
model = SmallCustomCNN(num_classes=num_classes, input_size=196608).to(device)

# Print the model structure (optional)
print(f"Small Custom CNN Model:\n{model}")

# -----------------------------
# 5. Training Loop
# -----------------------------

# Define loss functions and optimizer

# IoU Loss Function (same as before)
class IoULoss(nn.Module):
    def __init__(self):
        super(IoULoss, self).__init__()

    def forward(self, pred_boxes, target_boxes):
        # Convert boxes from (xmin, ymin, xmax, ymax) to (x1, y1, x2, y2)
        pred_boxes = torch.sigmoid(pred_boxes)  # Ensure predictions are between 0 and 1
        target_boxes = torch.sigmoid(target_boxes)
        
        # Intersection coordinates
        x1 = torch.max(pred_boxes[:, 0], target_boxes[:, 0])
        y1 = torch.max(pred_boxes[:, 1], target_boxes[:, 1])
        x2 = torch.min(pred_boxes[:, 2], target_boxes[:, 2])
        y2 = torch.min(pred_boxes[:, 3], target_boxes[:, 3])
        
        # Intersection area
        intersection = (x2 - x1).clamp(min=0) * (y2 - y1).clamp(min=0)
        
        # Areas of the boxes
        pred_area = (pred_boxes[:, 2] - pred_boxes[:, 0]).clamp(min=0) * \
                    (pred_boxes[:, 3] - pred_boxes[:, 1]).clamp(min=0)
        target_area = (target_boxes[:, 2] - target_boxes[:, 0]).clamp(min=0) * \
                      (target_boxes[:, 3] - target_boxes[:, 1]).clamp(min=0)
        
        # Union area
        union = pred_area + target_area - intersection + 1e-6  # Add epsilon to prevent division by zero
        
        # IoU
        iou = intersection / union
        
        # IoU Loss
        loss = 1 - iou
        return loss.mean()

# Initialize IoU Loss
iou_loss_fn = IoULoss()

# Cross Entropy Loss for classification
class_loss_fn = nn.CrossEntropyLoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Learning Rate Scheduler (Optional)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training loop with validation
num_epochs = 1  # Adjust as needed

for epoch in range(num_epochs):
    print(f"\n=== Epoch {epoch + 1}/{num_epochs} ===")
    model.train()
    total_iou_loss, total_class_loss = 0.0, 0.0
    train_loader_iter = tqdm(train_loader, desc="Training", leave=False)
    
    for batch_idx, (images, targets) in enumerate(train_loader_iter):
        # Skip batches where images or targets are None
        if images is None or targets is None:
            print(f"Skipping batch {batch_idx + 1} due to None values.")
            continue

        # Move data to the correct device
        images = images.to(device)
        bboxes = targets['boxes'].to(device)
        labels = targets['labels'].to(device)

        # Forward pass
        pred_bboxes, pred_labels = model(images)

        # Calculate losses
        bbox_loss = iou_loss_fn(pred_bboxes, bboxes)
        class_loss = class_loss_fn(pred_labels, labels)
        total_loss = bbox_loss + class_loss

        # Backward pass and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        # Accumulate losses
        total_iou_loss += bbox_loss.item()
        total_class_loss += class_loss.item()

        # Update progress bar
        train_loader_iter.set_postfix({'IoU Loss': bbox_loss.item(), 'Class Loss': class_loss.item()})

    avg_iou_loss = total_iou_loss / len(train_loader)
    avg_class_loss = total_class_loss / len(train_loader)
    print(f"Training Losses -> IoU Loss: {avg_iou_loss:.4f}, Class Loss: {avg_class_loss:.4f}")

    # Step the scheduler
    scheduler.step()

    # -----------------------------
    # 6. Validation After Each Epoch
    # -----------------------------

    def calculate_iou(box1, box2):
        """
        Calculate Intersection over Union (IoU) between two bounding boxes.
        Boxes are in the format [xmin, ymin, xmax, ymax]
        """
        x_left = max(box1[0], box2[0])
        y_top    = max(box1[1], box2[1])
        x_right  = min(box1[2], box2[2])
        y_bottom = min(box1[3], box2[3])

        if x_right < x_left or y_bottom < y_top:
            return 0.0  # No overlap

        intersection_area = (x_right - x_left) * (y_bottom - y_top)
        box1_area = (box1[2]-box1[0]) * (box1[3]-box1[1])
        box2_area = (box2[2]-box2[0]) * (box2[3]-box2[1])
        iou = intersection_area / float(box1_area + box2_area - intersection_area)
        return iou

    def evaluate_classification(model, dataloader, device):
        """
        Evaluate classification performance: Accuracy, Precision, Recall, F1-Score, Confusion Matrix
        """
        model.eval()
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for images, targets in tqdm(dataloader, desc="Validation Classification", leave=False):
                # Skip batches where images or targets are None
                if images is None or targets is None:
                    continue

                images = images.to(device)
                labels = targets['labels'].to(device)

                # Forward pass
                _, preds = model(images)  # Assuming model returns (bbox, class)
                preds = torch.argmax(preds, dim=1)

                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # Calculate metrics
        accuracy = accuracy_score(all_labels, all_preds)
        precision, recall, f1, _ = precision_recall_fscore_support(
            all_labels, all_preds, average='weighted', zero_division=0
        )
        conf_matrix = confusion_matrix(all_labels, all_preds)

        print("\n--- Classification Evaluation ---")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"Precision (Weighted): {precision:.4f}")
        print(f"Recall (Weighted): {recall:.4f}")
        print(f"F1-Score (Weighted): {f1:.4f}")
        print("Confusion Matrix:")
        print(conf_matrix)

    def evaluate_bounding_boxes(model, dataloader, device, iou_threshold=0.5):
        """
        Evaluate bounding box predictions: Mean IoU and Precision at IoU threshold
        """
        model.eval()
        all_iou = []
        matched = 0
        total = 0

        with torch.no_grad():
            for images, targets in tqdm(dataloader, desc="Validation Bounding Boxes", leave=False):
                # Skip batches where images or targets are None
                if images is None or targets is None:
                    continue

                images = images.to(device)
                true_bboxes = targets['boxes'].to(device)

                # Forward pass
                pred_bboxes, _ = model(images)

                # Move tensors to CPU for processing
                pred_bboxes = pred_bboxes.cpu().numpy()
                true_bboxes = true_bboxes.cpu().numpy()

                # Iterate over each prediction and true box
                for pred_box, true_box in zip(pred_bboxes, true_bboxes):
                    iou = calculate_iou(pred_box, true_box)
                    all_iou.append(iou)
                    if iou >= iou_threshold:
                        matched += 1
                    total += 1

        # Calculate metrics
        mean_iou = np.mean(all_iou) if all_iou else 0
        precision_at_iou = matched / total if total > 0 else 0

        print("\n--- Bounding Box Evaluation ---")
        print(f"Mean IoU: {mean_iou:.4f}")
        print(f"Precision at IoU >= {iou_threshold}: {precision_at_iou:.4f}")

    # Perform validation
    print(f"\n=== Validation After Epoch {epoch + 1} ===")
    evaluate_classification(model, val_loader, device)
    evaluate_bounding_boxes(model, val_loader, device, iou_threshold=0.5)

    # -----------------------------
    # 7. Save the Trained Model
    # -----------------------------

    # Save the trained model after each epoch
    checkpoint_path = f'custom_cnn_model_epoch_{epoch + 1}.pth'
    torch.save(model.state_dict(), checkpoint_path)
    print(f"Checkpoint saved to '{checkpoint_path}'")

# -----------------------------
# 8. Visualization of Predictions (Optional)
# -----------------------------

def visualize_predictions(model, dataloader, device, label_encoder, num_samples=5):
    """
    Visualize a few samples of ground truth and predicted bounding boxes along with class labels
    """
    model.eval()
    samples_visualized = 0

    with torch.no_grad():
        for images, targets in dataloader:
            # Skip batches where images or targets are None
            if images is None or targets is None:
                continue

            images = images.to(device)
            true_bboxes = targets['boxes'].to(device)
            true_labels = targets['labels'].to(device)

            # Forward pass
            pred_bboxes, pred_labels = model(images)
            preds = torch.argmax(pred_labels, dim=1)

            # Move tensors to CPU for visualization
            images = images.cpu().numpy()
            true_bboxes = true_bboxes.cpu().numpy()
            preds = preds.cpu().numpy()
            pred_bboxes = pred_bboxes.cpu().numpy()

            for img, true_box, true_label, pred_box, pred_label in zip(
                images, true_bboxes, true_labels, pred_bboxes, preds
            ):
                if samples_visualized >= num_samples:
                    return

                fig, ax = plt.subplots(1)
                img = np.transpose(img, (1, 2, 0))  # Convert from (C, H, W) to (H, W, C)
                # Unnormalize the image for visualization
                img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
                img = np.clip(img, 0, 1)
                ax.imshow(img)

                # Plot Ground Truth Bounding Box
                gt_xmin, gt_ymin, gt_xmax, gt_ymax = true_box
                gt_width = gt_xmax - gt_xmin
                gt_height = gt_ymax - gt_ymin
                gt_rect = patches.Rectangle(
                    (gt_xmin, gt_ymin), gt_width, gt_height, 
                    linewidth=2, edgecolor='g', facecolor='none', label='Ground Truth'
                )
                ax.add_patch(gt_rect)

                # Plot Predicted Bounding Box
                pred_xmin, pred_ymin, pred_xmax, pred_ymax = pred_box
                pred_width = pred_xmax - pred_xmin
                pred_height = pred_ymax - pred_ymin
                pred_rect = patches.Rectangle(
                    (pred_xmin, pred_ymin), pred_width, pred_height, 
                    linewidth=2, edgecolor='r', facecolor='none', label='Prediction'
                )
                ax.add_patch(pred_rect)

                # Add Labels
                gt_class = label_encoder.inverse_transform([true_label])[0]
                pred_class = label_encoder.inverse_transform([pred_label])[0]
                plt.title(f"GT: {gt_class} | Pred: {pred_class}")

                # Create Legend
                handles = [
                    patches.Patch(color='g', label='Ground Truth'),
                    patches.Patch(color='r', label='Prediction')
                ]
                plt.legend(handles=handles)

                plt.show()

                samples_visualized += 1

# Visualize some predictions after training
print("\n=== Visualizing Predictions on Validation Set ===")
visualize_predictions(model, val_loader, device, label_encoder, num_samples=5)




In [None]:
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torchvision.transforms as transforms
import torchvision.models as models
from torch.cuda.amp import GradScaler, autocast

# -----------------------------
# 1. Dataset Definition
# -----------------------------

class CachedDataset(Dataset):
    def __init__(self, dataframe, cached_dir, original_image_size=(2560, 1440), resized_image_size=(128, 128)):
        """
        Args:
            dataframe (pd.DataFrame): DataFrame containing image IDs, bbox coordinates, and class labels.
            cached_dir (str): Directory where cached image tensors are stored.
            original_image_size (tuple): Original image size as (width, height).
            resized_image_size (tuple): Desired image size as (width, height).
        """
        self.dataframe = dataframe
        self.cached_dir = cached_dir
        self.original_width, self.original_height = original_image_size
        self.resized_width, self.resized_height = resized_image_size
        self.transform = transforms.Compose([
            transforms.Resize(resized_image_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],  
                                 std=[0.229, 0.224, 0.225])   
        ])

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_id = self.dataframe.iloc[idx]['Image_ID']
        cached_path = os.path.join(self.cached_dir, f"{image_id}.pt")
        try:
            image = torch.load(cached_path)
        except Exception as e:
            print(f"Error loading cached image {cached_path}: {e}")
            # Return dummy data or handle appropriately
            image = torch.zeros(3, self.resized_height, self.resized_width)  # Adjust size accordingly
            bbox = torch.zeros(4, dtype=torch.float32)
            label = torch.tensor(0, dtype=torch.long)
            return image, {'boxes': bbox, 'labels': label}
        
        # Extract bounding box coordinates and scale them
        # Assuming original bbox coordinates are based on original_image_size
        bbox_original = [
            self.dataframe.iloc[idx][c] for c in ['xmin', 'ymin', 'xmax', 'ymax']
        ]
        # Scale bbox coordinates to match resized images
        scale_x = self.resized_width / self.original_width
        scale_y = self.resized_height / self.original_height
        bbox_scaled = [
            bbox_original[0] * scale_x,
            bbox_original[1] * scale_y,
            bbox_original[2] * scale_x,
            bbox_original[3] * scale_y
        ]
        # Clamp bbox coordinates to image boundaries
        bbox_scaled = [
            min(max(bbox_scaled[0], 0), self.resized_width),
            min(max(bbox_scaled[1], 0), self.resized_height),
            min(max(bbox_scaled[2], 0), self.resized_width),
            min(max(bbox_scaled[3], 0), self.resized_height)
        ]
        bbox = torch.tensor(bbox_scaled, dtype=torch.float32)
        
        # Extract class label
        label = torch.tensor(self.dataframe.iloc[idx]['class'], dtype=torch.long)
        
        # Debugging: Print fetched item details
        if idx < 10:  # Limit to first 10 to avoid excessive logging
            print(f"Fetched index {idx}: Image_ID={image_id}, BBox={bbox}, Label={label}")
        
        return image, {'boxes': bbox, 'labels': label}

# -----------------------------
# 2. Model Definition
# -----------------------------

class CustomCNN(nn.Module):
    def __init__(self, num_classes):
        super(CustomCNN, self).__init__()
        # Use a pretrained ResNet18 backbone with updated weights parameter
        self.backbone = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        num_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Identity()  # Remove the original FC layer
        
        # Define custom heads
        self.dropout = nn.Dropout(0.5)
        self.fc_bbox = nn.Linear(num_features, 4)
        self.fc_class = nn.Linear(num_features, num_classes)
    
    def forward(self, x):
        features = self.backbone(x)
        features = self.dropout(features)
        bbox_output = self.fc_bbox(features)
        class_output = self.fc_class(features)
        return bbox_output, class_output

# -----------------------------
# 3. Helper Functions
# -----------------------------

def cache_images(dataframe, images_dir, cached_dir, transform):
    """
    Preprocess and cache images by applying transformations and saving as tensors.
    
    Args:
        dataframe (pd.DataFrame): DataFrame containing image information.
        images_dir (str): Directory where original images are stored.
        cached_dir (str): Directory to save cached image tensors.
        transform (torchvision.transforms.Compose): Transformations to apply to images.
    """
    print("Caching images...")
    for idx, row in tqdm(dataframe.iterrows(), total=len(dataframe)):
        image_id = row['Image_ID']
        image_path = os.path.join(images_dir, image_id)
        cached_path = os.path.join(cached_dir, f"{image_id}.pt")
        if not os.path.exists(cached_path):
            try:
                image = Image.open(image_path).convert("RGB")
                image = transform(image)
                torch.save(image, cached_path)
            except Exception as e:
                print(f"Error caching image {image_id}: {e}")

def calculate_iou(box1, box2):
    """
    Calculate Intersection over Union (IoU) between two bounding boxes.
    
    Args:
        box1 (array-like): [xmin, ymin, xmax, ymax] for the first box.
        box2 (array-like): [xmin, ymin, xmax, ymax] for the second box.
    
    Returns:
        float: IoU value.
    """
    x_left = max(box1[0], box2[0])
    y_top    = max(box1[1], box2[1])
    x_right  = min(box1[2], box2[2])
    y_bottom = min(box1[3], box2[3])

    if x_right < x_left or y_bottom < y_top:
        return 0.0  # No overlap

    intersection_area = (x_right - x_left) * (y_bottom - y_top)
    box1_area = (box1[2]-box1[0]) * (box1[3]-box1[1])
    box2_area = (box2[2]-box2[0]) * (box2[3]-box2[1])
    iou = intersection_area / float(box1_area + box2_area - intersection_area)
    return iou

def evaluate_classification(model, dataloader, device):
    """
    Evaluate classification performance on the validation set.
    
    Args:
        model (nn.Module): Trained model.
        dataloader (DataLoader): Validation DataLoader.
        device (torch.device): Device to perform computations on.
    """
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Validation Classification", leave=False):
            if images is None or targets is None:
                continue

            images = images.to(device, non_blocking=True)
            labels = targets['labels'].to(device, non_blocking=True)

            with autocast():
                _, preds = model(images)
                preds = torch.argmax(preds, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted', zero_division=0
    )
    conf_matrix = confusion_matrix(all_labels, all_preds)

    print("\n--- Classification Evaluation ---")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision (Weighted): {precision:.4f}")
    print(f"Recall (Weighted): {recall:.4f}")
    print(f"F1-Score (Weighted): {f1:.4f}")
    print("Confusion Matrix:")
    print(conf_matrix)

def evaluate_bounding_boxes(model, dataloader, device, iou_threshold=0.5):
    """
    Evaluate bounding box predictions on the validation set.
    
    Args:
        model (nn.Module): Trained model.
        dataloader (DataLoader): Validation DataLoader.
        device (torch.device): Device to perform computations on.
        iou_threshold (float): Threshold to consider a prediction as correct.
    """
    model.eval()
    all_iou = []
    matched = 0
    total = 0

    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Validation Bounding Boxes", leave=False):
            if images is None or targets is None:
                continue

            images = images.to(device, non_blocking=True)
            true_bboxes = targets['boxes'].to(device, non_blocking=True)

            with autocast():
                pred_bboxes, _ = model(images)

            pred_bboxes = pred_bboxes.cpu().numpy()
            true_bboxes = true_bboxes.cpu().numpy()

            for pred_box, true_box in zip(pred_bboxes, true_bboxes):
                iou = calculate_iou(pred_box, true_box)
                all_iou.append(iou)
                if iou >= iou_threshold:
                    matched += 1
                total += 1

    mean_iou = np.mean(all_iou) if all_iou else 0
    precision_at_iou = matched / total if total > 0 else 0

    print("\n--- Bounding Box Evaluation ---")
    print(f"Mean IoU: {mean_iou:.4f}")
    print(f"Precision at IoU >= {iou_threshold}: {precision_at_iou:.4f}")

def visualize_predictions(model, dataloader, device, label_encoder, num_samples=5):
    """
    Visualize model predictions on a subset of the validation set.
    
    Args:
        model (nn.Module): Trained model.
        dataloader (DataLoader): Validation DataLoader.
        device (torch.device): Device to perform computations on.
        label_encoder (LabelEncoder): Label encoder to decode class labels.
        num_samples (int): Number of samples to visualize.
    """
    model.eval()
    samples_visualized = 0

    with torch.no_grad():
        for images, targets in dataloader:
            if images is None or targets is None:
                continue

            images = images.to(device, non_blocking=True)
            true_bboxes = targets['boxes'].to(device, non_blocking=True)
            true_labels = targets['labels'].to(device, non_blocking=True)

            with autocast():
                pred_bboxes, pred_labels = model(images)
                preds = torch.argmax(pred_labels, dim=1)

            images = images.cpu().numpy()
            true_bboxes = true_bboxes.cpu().numpy()
            preds = preds.cpu().numpy()
            pred_bboxes = pred_bboxes.cpu().numpy()

            for img, true_box, true_label, pred_box, pred_label in zip(
                images, true_bboxes, true_labels, pred_bboxes, preds
            ):
                if samples_visualized >= num_samples:
                    return

                fig, ax = plt.subplots(1)
                img = np.transpose(img, (1, 2, 0))  
                img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
                img = np.clip(img, 0, 1)
                ax.imshow(img)

                # Ground Truth Bounding Box
                gt_xmin, gt_ymin, gt_xmax, gt_ymax = true_box
                gt_width = gt_xmax - gt_xmin
                gt_height = gt_ymax - gt_ymin
                gt_rect = patches.Rectangle(
                    (gt_xmin, gt_ymin), gt_width, gt_height, 
                    linewidth=2, edgecolor='g', facecolor='none', label='Ground Truth'
                )
                ax.add_patch(gt_rect)

                # Predicted Bounding Box
                pred_xmin, pred_ymin, pred_xmax, pred_ymax = pred_box
                pred_width = pred_xmax - pred_xmin
                pred_height = pred_ymax - pred_ymin
                pred_rect = patches.Rectangle(
                    (pred_xmin, pred_ymin), pred_width, pred_height, 
                    linewidth=2, edgecolor='r', facecolor='none', label='Prediction'
                )
                ax.add_patch(pred_rect)

                # Add Labels
                gt_class = label_encoder.inverse_transform([true_label])[0]
                pred_class = label_encoder.inverse_transform([pred_label])[0]
                plt.title(f"GT: {gt_class} | Pred: {pred_class}")

                # Create Legend
                handles = [
                    patches.Patch(color='g', label='Ground Truth'),
                    patches.Patch(color='r', label='Prediction')
                ]
                plt.legend(handles=handles)

                plt.show()

                samples_visualized += 1

def validate_dataset(dataset, num_samples=10):
    """
    Validate the dataset by fetching and printing sample data.
    
    Args:
        dataset (Dataset): PyTorch Dataset to validate.
        num_samples (int): Number of samples to validate.
    """
    print(f"Validating the first {num_samples} samples of the dataset...")
    for i in range(min(num_samples, len(dataset))):
        try:
            image, targets = dataset[i]
            print(f"Sample {i + 1}:")
            print(f"  Image shape: {image.shape}")
            print(f"  BBox: {targets['boxes']}")
            print(f"  Label: {targets['labels']}")
        except Exception as e:
            print(f"Error in sample {i + 1}: {e}")

# -----------------------------
# 4. Main Function
# -----------------------------

def main():
    # -----------------------------
    # 1. Setup and Configuration
    # -----------------------------

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    train_csv_path = 'Train.csv'  
    images_dir = 'datasets/dataset/images/compressed/train'  
    cached_dir = 'cached_images'  # Directory to store cached tensors
    os.makedirs(cached_dir, exist_ok=True)

    # -----------------------------
    # 2. Data Preparation
    # -----------------------------

    # Load train data
    train = pd.read_csv(train_csv_path)

    # Encode class labels into numerical format using LabelEncoder
    label_encoder = LabelEncoder()
    train['class'] = label_encoder.fit_transform(train['class'])

    # Check for missing or corrupted images
    missing_images = []
    for img_id in train['Image_ID']:
        image_path = os.path.join(images_dir, img_id)
        if not os.path.exists(image_path):
            missing_images.append(img_id)
        else:
            try:
                img = Image.open(image_path)
                img.verify()  # Verify that it's an image
            except Exception:
                missing_images.append(img_id)

    if missing_images:
        print(f"Found {len(missing_images)} missing or corrupted images. Removing them from the dataset.")
        train = train[~train['Image_ID'].isin(missing_images)].reset_index(drop=True)
    else:
        print("All images are present and valid.")

    print(f"Number of training samples after filtering: {len(train)}")

    # Split data into training and validation sets (80% train, 20% val)
    train_df, val_df = train_test_split(
        train, 
        test_size=0.2, 
        random_state=42, 
        stratify=train['class']
    )

    print(f"Training samples: {len(train_df)}, Validation samples: {len(val_df)}")

    # -----------------------------
    # 3. Dataset and DataLoader
    # -----------------------------

    # Preprocess and cache images
    cache_images(train_df, images_dir, cached_dir, transform=transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],  
                             std=[0.229, 0.224, 0.225])   
    ]))
    cache_images(val_df, images_dir, cached_dir, transform=transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],  
                             std=[0.229, 0.224, 0.225])   
    ]))

    # Define original image size
    original_image_size = (2560, 1440)  # Replace with your actual image size
    resized_image_size = (128, 128)

    # Instantiate Cached Datasets
    train_dataset = CachedDataset(dataframe=train_df, cached_dir=cached_dir, 
                                  original_image_size=original_image_size, 
                                  resized_image_size=resized_image_size)
    val_dataset = CachedDataset(dataframe=val_df, cached_dir=cached_dir, 
                                original_image_size=original_image_size, 
                                resized_image_size=resized_image_size)

    # Validate Datasets
    validate_dataset(train_dataset)
    validate_dataset(val_dataset)

    # Determine optimal number of workers
    num_workers = 0  # Set to 0 for initial debugging
    print(f"Using {num_workers} workers for DataLoader.")

    # Determine prefetch_factor based on num_workers
    prefetch_factor = 2 if num_workers > 0 else None

    # Instantiate DataLoaders with conditional prefetch_factor
    train_loader = DataLoader(
        train_dataset, 
        batch_size=128,          
        shuffle=True,           
        num_workers=num_workers,  
        pin_memory=False,
        persistent_workers=False,
        prefetch_factor=prefetch_factor
    )

    val_loader = DataLoader(
        val_dataset, 
        batch_size=128,          
        shuffle=False,          
        num_workers=num_workers,  
        pin_memory=False,
        persistent_workers=False,
        prefetch_factor=prefetch_factor
    )

    # -----------------------------
    # 4. Model Definition
    # -----------------------------

    num_classes = len(train['class'].unique())

    model = CustomCNN(num_classes=num_classes).to(device)

    # Print the model structure (optional)
    print(f"Custom CNN Model:\n{model}")

    # -----------------------------
    # 5. Training Loop with Optimizations
    # -----------------------------

    # Define loss functions
    class IoULoss(nn.Module):
        def __init__(self):
            super(IoULoss, self).__init__()

        def forward(self, pred_boxes, target_boxes):
            # Remove sigmoid if bounding boxes are scaled to image dimensions
            # pred_boxes = torch.sigmoid(pred_boxes)
            # target_boxes = torch.sigmoid(target_boxes)
            
            # Calculate Intersection coordinates
            x1 = torch.max(pred_boxes[:, 0], target_boxes[:, 0])
            y1 = torch.max(pred_boxes[:, 1], target_boxes[:, 1])
            x2 = torch.min(pred_boxes[:, 2], target_boxes[:, 2])
            y2 = torch.min(pred_boxes[:, 3], target_boxes[:, 3])
            
            # Calculate Intersection area
            intersection = (x2 - x1).clamp(min=0) * (y2 - y1).clamp(min=0)
            
            # Calculate areas
            pred_area = (pred_boxes[:, 2] - pred_boxes[:, 0]).clamp(min=0) * \
                        (pred_boxes[:, 3] - pred_boxes[:, 1]).clamp(min=0)
            target_area = (target_boxes[:, 2] - target_boxes[:, 0]).clamp(min=0) * \
                          (target_boxes[:, 3] - target_boxes[:, 1]).clamp(min=0)
            
            # Calculate Union area
            union = pred_area + target_area - intersection + 1e-6  # Avoid division by zero
            
            # Calculate IoU
            iou = intersection / union
            
            # Calculate IoU Loss
            loss = 1 - iou
            return loss.mean()

    iou_loss_fn = IoULoss()
    class_loss_fn = nn.CrossEntropyLoss()

    # Define optimizer and scheduler
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    # Initialize GradScaler for mixed precision
    scaler = GradScaler()

    # Define number of epochs
    num_epochs = 10  # Adjust as needed

    # Training loop with validation and checkpointing
    for epoch in range(num_epochs):
        print(f"\n=== Epoch {epoch + 1}/{num_epochs} ===")
        model.train()
        total_iou_loss, total_class_loss = 0.0, 0.0
        train_loader_iter = tqdm(train_loader, desc="Training", leave=False)

        for batch_idx, (images, targets) in enumerate(train_loader_iter):
            if images is None or targets is None:
                print(f"Skipping batch {batch_idx + 1} due to None values.")
                continue

            # Debug statement to trace batch processing
            # print(f"Processing batch {batch_idx + 1}/{len(train_loader)}")

            images = images.to(device, non_blocking=True)
            bboxes = targets['boxes'].to(device, non_blocking=True)
            labels = targets['labels'].to(device, non_blocking=True)

            optimizer.zero_grad()
            
            with autocast():
                pred_bboxes, pred_labels = model(images)
                bbox_loss = iou_loss_fn(pred_bboxes, bboxes)
                class_loss = class_loss_fn(pred_labels, labels)
                total_loss = bbox_loss + class_loss
            
            scaler.scale(total_loss).backward()
            scaler.step(optimizer)
            scaler.update()

            total_iou_loss += bbox_loss.item()
            total_class_loss += class_loss.item()

            # Log every 10 batches
            if (batch_idx + 1) % 10 == 0:
                print(f"Batch {batch_idx + 1}/{len(train_loader)} - IoU Loss: {bbox_loss.item():.4f}, Class Loss: {class_loss.item():.4f}")

            train_loader_iter.set_postfix({'IoU Loss': bbox_loss.item(), 'Class Loss': class_loss.item()})

        avg_iou_loss = total_iou_loss / len(train_loader)
        avg_class_loss = total_class_loss / len(train_loader)
        print(f"Training Losses -> IoU Loss: {avg_iou_loss:.4f}, Class Loss: {avg_class_loss:.4f}")

        scheduler.step()

        # -----------------------------
        # 6. Validation After Each Epoch
        # -----------------------------

        print(f"\n=== Validation After Epoch {epoch + 1} ===")
        evaluate_classification(model, val_loader, device)
        evaluate_bounding_boxes(model, val_loader, device, iou_threshold=0.5)

        # -----------------------------
        # 7. Save the Trained Model
        # -----------------------------

        checkpoint_path = f'custom_cnn_model_epoch_{epoch + 1}.pth'
        torch.save(model.state_dict(), checkpoint_path)
        print(f"Checkpoint saved to '{checkpoint_path}'")

    # -----------------------------
    # 8. Visualization of Predictions (Optional)
    # -----------------------------

    print("\n=== Visualizing Predictions on Validation Set ===")
    visualize_predictions(model, val_loader, device, label_encoder, num_samples=5)

# -----------------------------
# 5. Entry Point
# -----------------------------

if __name__ == "__main__":
    main()