# **FLOATING OBJECT DETECTION**

**About the dataset**


1. Dataset size?
2. Size of images?
3. How many categories?
4. Exist annotation file with no data
5. Six categories: human, wind/sup-board, boat, bouy, sailboat, kayak


**[Download dataset](https://www.kaggle.com/datasets/jangsienicajzkowy/afo-aerial-dataset-of-floating-objects/data)**

In [300]:
import shutil
from pathlib import Path

**Data path**

In [301]:
# Image path of PART 1,2,3
img_path_1 = 'dataset/PART_1/PART_1/images/'
img_path_2 = 'dataset/PART_2/PART_2/images/'
img_path_3 = 'dataset/PART_3/PART_3/images/'

# Categories path
# Categories: human, wind/sup-board, boat, bouy, sailboat, kayak
categories_path = 'dataset/PART_1/PART_1/6categories/'

**Split Data into Train, Test & Validation**

In [302]:
# Split into three parts: the training (67,4% of objects), the test (19,12% of objects),
# and the validation set (13,48% of objects). In order to prevent overfitting of the model to the given data,
# the test set contains selected frames from nine videos that were not used in either the training or validation sets.

# Split image to : dataset/working/images
# Split annotation to: dataset/working/labels

def split_data(file_list, img_path, ann_path, mode):
    #Check if we have our mode folders
    images_working_folder = Path( 'dataset/working/images/'+  mode)
    if not images_working_folder.exists():
        print(f"Path {images_working_folder} does not exit")
        os.makedirs(images_working_folder)

    labels_working_folder = Path('dataset/working/labels/' + mode)
    if not labels_working_folder.exists():
        print(f"Path {labels_working_folder} does not exit")
        os.makedirs(labels_working_folder)

    #Creates the name of our label file from the img name and creates our source file
    for file in file_list:
        name = file.replace('.jpg', '')
        img_src_file = str(img_path) + '/' + name + '.jpg'
        annot_src_file = str(ann_path) + '/' + name + '.txt'
        
        if Path(img_src_file).exists() and Path(annot_src_file).exists():
            #move image
            IMG_DIR = 'dataset/working/images/' + mode
            img_dest_file = str(IMG_DIR) + '/' + name + '.jpg'
            if os.path.isfile(img_src_file) and not Path(img_dest_file).exists():
                shutil.move(img_src_file, img_dest_file)
    
            # Copy annotations
            ANNOT_DIR = 'dataset/working/labels/' + mode
            annot_dest_file = str(ANNOT_DIR) + '/' + name + '.txt'
            if os.path.isfile(annot_src_file) and not Path(annot_dest_file).exists():
                shutil.move(annot_src_file, annot_dest_file)

In [303]:
#Get our images list
train_imgs = 'dataset/PART_1/PART_1/train.txt'
test_imgs = 'dataset/PART_1/PART_1/test.txt'
val_imgs = 'dataset/PART_1/PART_1/validation.txt'
with open(train_imgs, 'r') as f:
    train_img_list = [line.strip() for line in f.readlines()]

with open(test_imgs, 'r') as f:
    test_img_list = [line.strip() for line in f.readlines()]

with open(val_imgs, 'r') as f:
    val_img_list = [line.strip() for line in f.readlines()]

print(train_img_list[0], test_img_list[0], val_img_list[0])

a_102.jpg k2_38.jpg a_101.jpg


In [304]:
# Root path
root_img_path = Path('dataset/images/')
root_ann_path = Path('dataset/annotations/')

#Split Data
split_data(train_img_list, root_img_path, root_ann_path, 'train')
split_data(test_img_list, root_img_path, root_ann_path, 'test')
split_data(val_img_list, root_img_path, root_ann_path, 'val')

In [305]:
import glob
import os
working_image_path = 'dataset/working/images/'
working_labels_path = 'dataset/working/labels/'

# Images
img_test_path = glob.glob(os.path.join(working_image_path + '/test/' , "*.jpg"))
print(f'img_test_path: {len(img_test_path)}')

img_train_path = glob.glob(os.path.join(working_image_path + '/train/' , "*.jpg"))
print(f'img_train_path: {len(img_train_path)}')

img_val_path = glob.glob(os.path.join(working_image_path + '/val/' , "*.jpg"))
print(f'img_val_path: {len(img_val_path)}')

# Labels
label_test_path = glob.glob(os.path.join(working_labels_path + '/test/' , "*.txt"))
print(f'label_test_path: {len(label_test_path)}')

label_train_path = glob.glob(os.path.join(working_labels_path + '/train/' , "*.txt"))
print(f'label_train_path: {len(label_train_path)}')

label_val_path = glob.glob(os.path.join(working_image_path + '/val/' , "*.txt"))
print(f'label_val_path: {len(label_val_path)}')

img_test_path: 514
img_train_path: 2787
img_val_path: 340
label_test_path: 514
label_train_path: 2787
label_val_path: 0


In [306]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

# 1. Custom Dataset class with handling for empty annotations
class ObjectDetectionDataset(Dataset):
    def __init__(self, images_dir, annotations_dir, classes, transform=None):
        self.images_dir = images_dir
        self.annotations_dir = annotations_dir
        self.transform = transform
        self.classes = classes
        self.num_classes = len(classes)
        self.image_files = [f for f in os.listdir(self.images_dir) if f.endswith('.jpg')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Load image
        img_name = self.image_files[idx]
        image_path = os.path.join(self.images_dir, img_name)
        image = Image.open(image_path).convert("RGB")
        orig_width, orig_height = image.size
        
        # Store original size for later normalization
        orig_size = (orig_height, orig_width)
        
        # Apply transformations to image
        if self.transform:
            image = self.transform(image)
        
        # Get image dimensions after transformation
        if isinstance(image, torch.Tensor):
            img_height, img_width = image.shape[1:3]
        else:
            img_width, img_height = image.size
            
        # Load annotation
        annotation_name = os.path.splitext(img_name)[0] + '.txt'
        annotation_path = os.path.join(self.annotations_dir, annotation_name)
        
        # Initialize empty tensors for bounding boxes and labels
        boxes = []
        labels = []

        # Parse annotation file if it exists
        if os.path.exists(annotation_path):
            with open(annotation_path, 'r') as f:
                for line in f:
                    data = line.strip().split()
                    if len(data) == 5:
                        class_id = int(data[0])
                        
                        # YOLO format: class_id, x_center, y_center, width, height (normalized)
                        x_center = float(data[1])
                        y_center = float(data[2])
                        width = float(data[3])
                        height = float(data[4])
                        
                        # Convert from YOLO format to pixel coordinates
                        x_min = (x_center - width/2) * orig_width
                        y_min = (y_center - height/2) * orig_height
                        x_max = (x_center + width/2) * orig_width
                        y_max = (y_center + height/2) * orig_height
                        
                        # Clip to image boundaries
                        x_min = max(0, min(x_min, orig_width))
                        y_min = max(0, min(y_min, orig_height))
                        x_max = max(0, min(x_max, orig_width))
                        y_max = max(0, min(y_max, orig_height))
                        
                        # Convert coordinates to the transformed image size
                        x_min = x_min * (img_width / orig_width)
                        y_min = y_min * (img_height / orig_height)
                        x_max = x_max * (img_width / orig_width)
                        y_max = y_max * (img_height / orig_height)
                        
                        boxes.append([x_min, y_min, x_max, y_max])
                        labels.append(class_id)

        # Handle case with no annotations
        if len(boxes) == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros(0, dtype=torch.int64)
        else:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)

        # Create target dictionary
        target = {
            'boxes': boxes,
            'labels': labels,
            'image_id': torch.tensor([idx]),
            'orig_size': torch.as_tensor(orig_size)
        }

        return image, target

# 2. Custom collate function to handle variable number of objects
def collate_fn(batch):
    images = []
    targets = []

    for image, target in batch:
        images.append(image)
        targets.append(target)

    images = torch.stack(images, 0)
    return images, targets

In [307]:
# 3. Simple CNN-based object detection model with corrected dimensions
from torchvision.models import resnet50
"""
    nn.Module: The base class for all neural network modules in PyTorch.
"""
class ImprovedDetector(nn.Module):
    def __init__(self, num_classes, pretrained=True):
        super(ImprovedDetector, self).__init__()
        
        # Use ResNet50 as backbone with pretrained weights
        self.backbone = resnet50(pretrained=pretrained)
        
        # Remove the classification head
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-2])
        
        # Feature pyramid network components
        self.toplayer = nn.Conv2d(2048, 256, kernel_size=1)
        
        # Lateral connections
        self.latlayer1 = nn.Conv2d(1024, 256, kernel_size=1)
        self.latlayer2 = nn.Conv2d(512, 256, kernel_size=1)
        
        # Smooth layers
        self.smooth1 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.smooth2 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        
        # Detection head
        self.detection_head = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        
        # Output layers
        self.bbox_pred = nn.Conv2d(256, 4, kernel_size=3, padding=1)
        self.cls_pred = nn.Conv2d(256, num_classes, kernel_size=3, padding=1)
        self.objectness = nn.Conv2d(256, 1, kernel_size=3, padding=1)
        
    def _upsample_add(self, x, y):
        """Upsample and add two feature maps."""
        _, _, H, W = y.size()
        return nn.functional.interpolate(x, size=(H, W), mode='bilinear', align_corners=False) + y
        
    def _make_fpn(self, c3, c4, c5):
        """Build FPN."""
        # Top-down
        p5 = self.toplayer(c5)
        p4 = self._upsample_add(p5, self.latlayer1(c4))
        p3 = self._upsample_add(p4, self.latlayer2(c3))
        
        # Smooth
        p4 = self.smooth1(p4)
        p3 = self.smooth2(p3)
        
        return p3, p4, p5
        
    def forward(self, x):
        # Extract features from backbone
        features = self.backbone(x)
        
        # Get intermediate features
        c3 = features[:, :512, :, :]
        c4 = features[:, 512:1536, :, :]
        c5 = features[:, 1536:, :, :]
        
        # Generate FPN features
        p3, p4, p5 = self._make_fpn(c3, c4, c5)
        
        # Apply detection head to P3 (highest resolution feature map)
        det_features = self.detection_head(p3)
        
        # Generate predictions
        bbox_pred = self.bbox_pred(det_features)
        cls_scores = self.cls_pred(det_features)
        obj_score = torch.sigmoid(self.objectness(det_features))
        
        # Reshape outputs
        batch_size = x.shape[0]
        feature_h, feature_w = p3.shape[2], p3.shape[3]
        
        # Reshape bbox predictions
        bbox_pred = bbox_pred.permute(0, 2, 3, 1).contiguous()
        bbox_pred = bbox_pred.view(batch_size, -1, 4)
        
        # Reshape class predictions
        cls_scores = cls_scores.permute(0, 2, 3, 1).contiguous()
        cls_scores = cls_scores.view(batch_size, -1, cls_scores.shape[1])
        
        # Reshape objectness predictions
        obj_score = obj_score.permute(0, 2, 3, 1).contiguous()
        obj_score = obj_score.view(batch_size, -1, 1)
        
        return {
            'bbox': bbox_pred,
            'cls': cls_scores,
            'objectness': obj_score,
            'feature_size': (feature_h, feature_w)
        }

In [308]:
import torchvision.ops as ops
# 4. Loss function for object detection
class ImprovedDetectionLoss(nn.Module):
    def __init__(self, lambda_coord=1.0, lambda_cls=1.0, lambda_obj=1.0, iou_threshold=0.5):
        super(ImprovedDetectionLoss, self).__init__()
        self.lambda_coord = lambda_coord
        self.lambda_cls = lambda_cls
        self.lambda_obj = lambda_obj
        self.iou_threshold = iou_threshold
        
        # Loss functions
        self.mse = nn.MSELoss(reduction='none')
        self.bce = nn.BCEWithLogitsLoss(reduction='none')
        self.ce = nn.CrossEntropyLoss(reduction='none')
        
    def _generate_anchors(self, feature_size, input_size=(224, 224)):
        """Generate anchor boxes for each position in the feature map."""
        feature_h, feature_w = feature_size
        input_h, input_w = input_size
        
        # Calculate stride
        stride_h = input_h / feature_h
        stride_w = input_w / feature_w
        
        # Generate grid centers
        centers_h = torch.arange(0.5, feature_h, 1.0) * stride_h
        centers_w = torch.arange(0.5, feature_w, 1.0) * stride_w
        
        # Create grid
        centers_h, centers_w = torch.meshgrid(centers_h, centers_w)
        centers = torch.stack((centers_w.flatten(), centers_h.flatten()), dim=1)
        
        # Define anchor sizes (can be tuned for specific dataset)
        # Using a mix of small, medium, and large anchors
        sizes = torch.tensor([[stride_h, stride_w], 
                              [stride_h*2, stride_w*2], 
                              [stride_h, stride_w*2], 
                              [stride_h*2, stride_w]])
        
        # Generate anchors for each center
        anchors = []
        for center in centers:
            for size in sizes:
                x_min = center[0] - size[0] / 2
                y_min = center[1] - size[1] / 2
                x_max = center[0] + size[0] / 2
                y_max = center[1] + size[1] / 2
                anchors.append([x_min, y_min, x_max, y_max])
                
        return torch.tensor(anchors, dtype=torch.float32)
    
    def _calculate_iou(self, boxes1, boxes2):
        """Calculate IoU between two sets of boxes."""
        # Calculate intersection
        x_min = torch.max(boxes1[:, 0].unsqueeze(1), boxes2[:, 0].unsqueeze(0))
        y_min = torch.max(boxes1[:, 1].unsqueeze(1), boxes2[:, 1].unsqueeze(0))
        x_max = torch.min(boxes1[:, 2].unsqueeze(1), boxes2[:, 2].unsqueeze(0))
        y_max = torch.min(boxes1[:, 3].unsqueeze(1), boxes2[:, 3].unsqueeze(0))
        
        intersection = torch.clamp(x_max - x_min, min=0) * torch.clamp(y_max - y_min, min=0)
        
        # Calculate union
        boxes1_area = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1])
        boxes2_area = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1])
        
        union = boxes1_area.unsqueeze(1) + boxes2_area.unsqueeze(0) - intersection
        
        return intersection / (union + 1e-6)
    
    def forward(self, predictions, targets, device='cpu'):
        batch_size = predictions['bbox'].shape[0]
        total_anchors = predictions['bbox'].shape[1]
        feature_size = predictions['feature_size']
        
        # Initialize loss components
        bbox_loss = torch.tensor(0.0, device=device)
        cls_loss = torch.tensor(0.0, device=device)
        obj_loss = torch.tensor(0.0, device=device)
        
        for i in range(batch_size):
            # Get predictions for this image
            pred_bbox = predictions['bbox'][i]  # (anchors, 4)
            pred_cls = predictions['cls'][i]    # (anchors, num_classes)
            pred_obj = predictions['objectness'][i].squeeze(-1)  # (anchors)
            
            # Get target for this image
            target = targets[i]
            target_boxes = target['boxes'].to(device)  # (objects, 4)
            target_labels = target['labels'].to(device)  # (objects)
            
            # Generate anchors
            anchors = self._generate_anchors(feature_size).to(device)
            
            # If there are objects in this image
            if len(target_boxes) > 0:
                # Calculate IoU between anchors and ground truth boxes
                ious = self._calculate_iou(anchors, target_boxes)  # (anchors, objects)
                
                # Assign anchors to objects
                max_iou, best_target_idx = ious.max(dim=1)  # (anchors)
                
                # Positive anchors: IoU > threshold
                positive_mask = max_iou > self.iou_threshold
                
                # Objectness loss
                objectness_target = torch.zeros_like(pred_obj)
                objectness_target[positive_mask] = 1.0
                obj_loss += self.bce(pred_obj, objectness_target).mean()
                
                # For positive anchors only:
                if positive_mask.sum() > 0:
                    # Get target boxes and labels for positive anchors
                    pos_best_target_idx = best_target_idx[positive_mask]
                    pos_target_boxes = target_boxes[pos_best_target_idx]
                    pos_target_labels = target_labels[pos_best_target_idx]
                    
                    # Box loss: GIoU loss for better convergence
                    pos_pred_bbox = pred_bbox[positive_mask]
                    bbox_loss += ops.generalized_box_iou_loss(
                        pos_pred_bbox, 
                        pos_target_boxes,
                        reduction='mean'
                    )
                    
                    # Classification loss
                    pos_pred_cls = pred_cls[positive_mask]
                    cls_loss += self.ce(pos_pred_cls, pos_target_labels).mean()
            else:
                # No objects - objectness should be 0
                obj_loss += self.bce(pred_obj, torch.zeros_like(pred_obj)).mean()
                
        # Compute total loss
        total_loss = (
            self.lambda_coord * bbox_loss + 
            self.lambda_cls * cls_loss + 
            self.lambda_obj * obj_loss
        ) / batch_size
        
        # Return individual loss components for monitoring
        return total_loss, {
            'bbox_loss': bbox_loss / batch_size,
            'cls_loss': cls_loss / batch_size,
            'obj_loss': obj_loss / batch_size
        }


In [309]:
# 5. Training function
def train_model(model, train_loader, val_loader=None, criterion=None, optimizer=None, 
                scheduler=None, num_epochs=10, device='cpu'):
    model.to(device)
    best_loss = float('inf')
    
    # Initialize loss history
    history = {
        'train_loss': [],
        'val_loss': [],
        'bbox_loss': [],
        'cls_loss': [],
        'obj_loss': []
    }
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 20)
        
        # Training phase
        model.train()
        running_loss = 0.0
        running_bbox_loss = 0.0
        running_cls_loss = 0.0
        running_obj_loss = 0.0
        
        for i, (images, targets) in enumerate(train_loader):
            # Move data to device
            images = images.to(device)
            targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v 
                     for k, v in t.items()} for t in targets]
            
            # Zero gradients
            optimizer.zero_grad()
            
            # Forward pass
            predictions = model(images)
            
            # Calculate loss
            loss, loss_components = criterion(predictions, targets, device)
            
            # Backward pass and optimize
            loss.backward()
            
            # Clip gradients to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=10.0)
            
            optimizer.step()
            
            # Update statistics
            running_loss += loss.item()
            running_bbox_loss += loss_components['bbox_loss'].item()
            running_cls_loss += loss_components['cls_loss'].item()
            running_obj_loss += loss_components['obj_loss'].item()
            
            # Print stats every 10 batches
            if (i+1) % 10 == 0:
                batch_loss = running_loss / 10
                batch_bbox_loss = running_bbox_loss / 10
                batch_cls_loss = running_cls_loss / 10
                batch_obj_loss = running_obj_loss / 10
                
                print(f'  Batch {i+1}/{len(train_loader)} | '
                      f'Loss: {batch_loss:.4f} | '
                      f'Box: {batch_bbox_loss:.4f} | '
                      f'Cls: {batch_cls_loss:.4f} | '
                      f'Obj: {batch_obj_loss:.4f}')
                
                running_loss = 0.0
                running_bbox_loss = 0.0
                running_cls_loss = 0.0
                running_obj_loss = 0.0
        
        # Validation phase
        if val_loader:
            model.eval()
            val_loss = 0.0
            val_samples = 0
            
            with torch.no_grad():
                for images, targets in val_loader:
                    images = images.to(device)
                    targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v 
                             for k, v in t.items()} for t in targets]
                    
                    predictions = model(images)
                    loss, _ = criterion(predictions, targets, device)
                    
                    val_loss += loss.item() * images.size(0)
                    val_samples += images.size(0)
            
            val_loss = val_loss / val_samples
            history['val_loss'].append(val_loss)
            
            print(f'  Validation Loss: {val_loss:.4f}')
            
            # Update learning rate based on validation loss
            if scheduler:
                scheduler.step(val_loss)
                
            # Save best model
            if val_loss < best_loss:
                best_loss = val_loss
                torch.save(model.state_dict(), 'best_model.pth')
                print('  New best model saved!')
    
    # Save final model
    torch.save(model.state_dict(), 'final_model.pth')
    print('Training complete')
    
    return model, history

### **Train model**

Create Dataset and DatasetLoader

In [310]:
"""
Args:
    batch_size (int): Number of images processed in one forward/backward pass.
    num_epochs (int): Number of times the model will iterate over the entire dataset.
    learning_rate (float): Step size for updating model weights during optimization.
"""
train_images_dir = 'dataset/working/images/train/'
train_label_dir = 'dataset/working/labels/train/'
val_images_dir = 'dataset/working/images/val/'  # Add validation set if available
val_label_dir = 'dataset/working/labels/val/'
classes = ['human', 'wind/sup-board', 'boat', 'bouy', 'sailboat', 'kayak']
batch_size = 8
num_epochs = 20
learning_rate = 0.0001  # Reduced learning rate
input_size = (224, 224) 
# Set device
"""
Checks if a GPU (cuda) is available and sets the device accordingly. 
If not, it defaults to the CPU.
Reason: GPUs are much faster for deep learning tasks due to their parallel processing capabilities.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Data transformations
transform = transforms.Compose([
    transforms.Resize(input_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create dataset and dataloader
train_dataset = ObjectDetectionDataset(
    images_dir=train_images_dir,
    annotations_dir=train_label_dir,
    classes=classes,
    transform=transform
)

# Print dataset size
print(f"Dataset size: {len(train_dataset)} images")

# Create validation dataset if directories exist
val_dataset = None
if os.path.exists(val_images_dir) and os.path.exists(val_label_dir):
    val_dataset = ObjectDetectionDataset(
        images_dir=val_images_dir,
        annotations_dir=val_label_dir,
        classes=classes,
        transform=transform
    )
    
 # Print dataset size
print(f"Training dataset size: {len(train_dataset)} images")
if val_dataset:
    print(f"Validation dataset size: {len(val_dataset)} images")

# Use custom collate_fn to handle variable number of objects
"""
Args:
    batch_size (int): Number of images processed in one forward/backward pass.
    shuffle: Shuffles the dataset at the start of each epoch to ensure randomness.
    num_workers (int): Number of subprocesses for data loading. Set to 0 for single-threaded loading.
    collate_fn: A custom function to handle variable numbers of objects in different images.
"""

# Create dataloaders
train_loader = DataLoader(
    train_dataset, 
    batch_size=batch_size,
    shuffle=True,
    num_workers=4 if device.type == 'cuda' else 0,
    collate_fn=collate_fn,
    pin_memory=True if device.type == 'cuda' else False
)
    
val_loader = None
if val_dataset:
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4 if device.type == 'cuda' else 0,
        collate_fn=collate_fn,
        pin_memory=True if device.type == 'cuda' else False
    )

# Check first sample to verify data loading
sample_img, sample_target = train_dataset[0]
print(f"Sample image shape: {sample_img.shape}")
print(f"Sample target boxes shape: {sample_target['boxes'].shape}")
print(f'Sample target boxes value: {sample_target['boxes']}')
print(f"Sample target labels: {sample_target['labels'].shape}")
print(f"Sample target labels values: {sample_target['labels']}")

Using device: cpu
Dataset size: 2787 images
Training dataset size: 2787 images
Validation dataset size: 340 images
Sample image shape: torch.Size([3, 224, 224])
Sample target boxes shape: torch.Size([34, 4])
Sample target boxes value: tensor([[4.0279e+01, 6.8652e+01, 4.4771e+01, 7.6948e+01],
        [1.4788e+02, 1.8636e+02, 1.5173e+02, 1.9403e+02],
        [1.3977e+02, 1.6655e+02, 1.4502e+02, 1.7982e+02],
        [4.3721e+01, 1.2994e+02, 4.9612e+01, 1.4176e+02],
        [7.5804e+01, 1.3279e+02, 8.0179e+01, 1.4078e+02],
        [1.1792e+02, 1.0718e+02, 1.2346e+02, 1.1952e+02],
        [1.3420e+02, 1.6141e+02, 1.3904e+02, 1.7189e+02],
        [1.2542e+02, 1.4213e+02, 1.2903e+02, 1.5156e+02],
        [2.1204e+01, 4.7185e+00, 2.7212e+01, 1.3741e+01],
        [1.4467e+01, 1.5556e-01, 1.9483e+01, 4.6148e+00],
        [1.0684e+02, 1.0059e+02, 1.1250e+02, 1.1221e+02],
        [4.2204e+01, 8.7993e+01, 4.7046e+01, 9.8674e+01],
        [7.5571e+01, 1.6733e+02, 7.9363e+01, 1.7282e+02],
        [7.

In [311]:
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Configuration
# 6. Main function to set up and run the training
if __name__ == '__main__':
    
    # Create model, loss function, and optimizer
    # Create model, loss function, and optimizer
    model = ImprovedDetector(num_classes=len(classes))
    criterion = ImprovedDetectionLoss()
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-4)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)
     
    # Print model summary
    print("\nModel Structure:")
    print(model)
    
    # Train the model
    trained_model, history = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=num_epochs,
        device=device
    )
    
    # Try a forward pass with a batch to check dimensions
    # Plot training history
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    if val_loader:
        plt.plot(history['val_loss'], label='Validation Loss')
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history['bbox_loss'], label='Box Loss')
    plt.plot(history['cls_loss'], label='Class Loss')
    plt.plot(history['obj_loss'], label='Object Loss')
    plt.title('Loss Components')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /Users/truongngo/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100.0%



Model Structure:
ImprovedDetector(
  (backbone): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential

RuntimeError: Given groups=1, weight of size [256, 2048, 1, 1], expected input[8, 512, 7, 7] to have 2048 channels, but got 512 channels instead

### **Validate model**

In [None]:
def load_model(model_path, num_classes, input_size):
    """
    Load a trained model from a saved state dict
    
    Args:
        model_path (str): Path to the saved model
        num_classes (int): Number of object classes
        input_size (tuple): Input image size (height, width)
    
    Returns:
        model: Loaded model
    """
    model = SimpleDetector(num_classes=num_classes, input_size=input_size)
    model.load_state_dict(torch.load(model_path))
    model.eval()  # Set to evaluation mode
    return model

In [None]:
def preprocess_image(image_path, input_size):
    """
    Preprocess an image for inference
    
    Args:
        image_path (str): Path to the input image
        input_size (tuple): Input size for the model (height, width)
    
    Returns:
        tensor_image: Preprocessed image tensor
        original_image: Original PIL image
        scale_factor: Scale factor between original and resized image
    """
    # Load image
    original_image = Image.open(image_path).convert('RGB')
    original_size = original_image.size  # (width, height)
    
    # Calculate scale factors for converting coordinates back to original image
    scale_x = original_size[0] / input_size[1]
    scale_y = original_size[1] / input_size[0]
    scale_factor = (scale_x, scale_y)
    
    # Apply the same transformations used during training
    transform = transforms.Compose([
        transforms.Resize(input_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    tensor_image = transform(original_image).unsqueeze(0)  # Add batch dimension
    
    return tensor_image, original_image, scale_factor

In [None]:
import torchvision


def detect_objects_model(model, image_tensor, confidence_threshold=0.5, nms_threshold=0.4):
    """
    Perform object detection on an image
    
    Args:
        model: The trained object detection model
        image_tensor: Preprocessed image tensor
        confidence_threshold: Minimum confidence for valid detections
        nms_threshold: Non-maximum suppression threshold
    
    Returns:
        boxes: Detected bounding boxes
        labels: Detected class labels
        scores: Confidence scores
    """
    with torch.no_grad():
        outputs = model(image_tensor)
    
    # Extract predictions
    pred_bbox = outputs['bbox'][0]  # Remove batch dimension
    pred_cls = outputs['cls'][0]
    pred_objectness = outputs['objectness'][0]
    
    # Print shape information for debugging
    print(f"Prediction shapes - bbox: {pred_bbox.shape}, cls: {pred_cls.shape}, objectness: {pred_objectness.shape}")
    
    # Check if pred_bbox has the correct shape (should have 4 values per box)
    if pred_bbox.dim() == 1:
        # If it's a flat tensor, reshape it based on known number of bbox coordinates (4)
        pred_bbox = pred_bbox.view(-1, 4)
        print(f"Reshaped flat bbox to: {pred_bbox.shape}")
    elif pred_bbox.dim() > 2:
        # If pred_bbox has more than 2 dimensions, reshape it
        pred_bbox = pred_bbox.reshape(-1, pred_bbox.shape[-1])
        print(f"Reshaped bbox to: {pred_bbox.shape}")
    
    # Make sure each box has 4 coordinates
    if pred_bbox.shape[1] != 4:
        print(f"Warning: bbox does not have 4 coordinates per box. Current shape: {pred_bbox.shape}")
        # If we can't fix this, we'll filter out invalid boxes later
    
    if pred_cls.dim() > 2:
        # If pred_cls has more than 2 dimensions, reshape it
        pred_cls = pred_cls.reshape(-1, pred_cls.shape[-1])
        print(f"Reshaped cls to: {pred_cls.shape}")
    
    if pred_objectness.dim() > 1:
        # If pred_objectness has more than 1 dimension, flatten it
        pred_objectness = pred_objectness.reshape(-1)
        print(f"Reshaped objectness to: {pred_objectness.shape}")
    
    # Make sure all tensors have consistent first dimension
    min_length = min(pred_bbox.shape[0], pred_cls.shape[0] if pred_cls.dim() > 1 else len(pred_cls), 
                    len(pred_objectness))
    
    print(f"Using min_length: {min_length}")
    
    if pred_bbox.shape[0] > min_length:
        pred_bbox = pred_bbox[:min_length]
    
    if pred_cls.dim() > 1 and pred_cls.shape[0] > min_length:
        pred_cls = pred_cls[:min_length]
    elif pred_cls.dim() == 1 and len(pred_cls) > min_length:
        pred_cls = pred_cls[:min_length]
    
    if len(pred_objectness) > min_length:
        pred_objectness = pred_objectness[:min_length]
    
    # Get class scores
    if pred_cls.dim() > 1:
        # Multi-class case
        class_scores, class_indices = torch.max(pred_cls, dim=1)
    else:
        # Single-class case
        class_scores = pred_cls
        class_indices = torch.zeros_like(class_scores, dtype=torch.long)
    
    # Calculate confidence scores
    confidence_scores = pred_objectness * class_scores
    
    # Filter by confidence threshold
    mask = confidence_scores > confidence_threshold
    
    # Ensure mask length matches the tensors
    if len(mask) != pred_bbox.shape[0]:
        print(f"Warning: Mask length {len(mask)} doesn't match bbox length {pred_bbox.shape[0]}")
        # Adjust mask to match the tensor size
        if len(mask) > pred_bbox.shape[0]:
            mask = mask[:pred_bbox.shape[0]]
        else:
            # If mask is too short, extend it with False values
            padding = torch.zeros(pred_bbox.shape[0] - len(mask), dtype=torch.bool, device=mask.device)
            mask = torch.cat([mask, padding])
    
    # Apply mask
    boxes = pred_bbox[mask]
    labels = class_indices[mask] if len(class_indices) == len(mask) else torch.zeros(sum(mask), dtype=torch.long)
    scores = confidence_scores[mask]
    
    print(f"After filtering: {len(boxes)} boxes remaining")
    
    # Validate that all boxes have 4 coordinates
    valid_boxes_mask = torch.ones(boxes.shape[0], dtype=torch.bool)
    
    if boxes.dim() == 1:  # If boxes somehow ended up as a 1D tensor
        print("Warning: boxes is a 1D tensor. Reshaping...")
        if boxes.shape[0] % 4 == 0:  # If divisible by 4, reshape
            boxes = boxes.reshape(-1, 4)
        else:
            # Can't reshape cleanly, we'll need to filter out bad boxes
            valid_boxes = []
            valid_labels = []
            valid_scores = []
            
            for i in range(len(boxes)):
                if isinstance(boxes[i], torch.Tensor) and boxes[i].numel() == 4:
                    valid_boxes.append(boxes[i])
                    valid_labels.append(labels[i])
                    valid_scores.append(scores[i])
            
            # If we found valid boxes, stack them
            if valid_boxes:
                boxes = torch.stack(valid_boxes)
                labels = torch.stack(valid_labels)
                scores = torch.stack(valid_scores)
            else:
                # No valid boxes, return empty tensors
                return torch.zeros((0, 4)), torch.zeros(0, dtype=torch.long), torch.zeros(0)
    
    # Check if boxes has the correct shape after filtering
    if boxes.shape[1] != 4:
        print(f"Warning: Filtered boxes don't have 4 coordinates. Shape: {boxes.shape}")
        # Create valid boxes mask
        for i in range(boxes.shape[0]):
            if boxes[i].numel() != 4:
                valid_boxes_mask[i] = False
        
        # Apply valid boxes mask
        if not torch.all(valid_boxes_mask):
            boxes = boxes[valid_boxes_mask]
            labels = labels[valid_boxes_mask]
            scores = scores[valid_boxes_mask]
    
    # Apply non-maximum suppression if we have more than one box
    if len(boxes) > 1:
        # Ensure boxes are in correct format for NMS (some models output x,y,w,h instead of x1,y1,x2,y2)
        if boxes.shape[1] == 4:
            # Check if format is x,y,w,h by seeing if x2,y2 values are always larger than x1,y1
            x1y1 = boxes[:, :2]
            x2y2 = boxes[:, 2:]
            if not torch.all(x2y2 > x1y1):
                # Convert from x,y,w,h to x1,y1,x2,y2
                x1y1 = boxes[:, :2]
                wh = boxes[:, 2:]
                x2y2 = x1y1 + wh
                boxes = torch.cat([x1y1, x2y2], dim=1)
                
        try:
            # Apply NMS
            indices = torchvision.ops.nms(boxes, scores, nms_threshold)
            boxes = boxes[indices]
            labels = labels[indices]
            scores = scores[indices]
        except Exception as e:
            print(f"Error during NMS: {e}")
            # If NMS fails, return all detections
            pass
    
    return boxes, labels, scores

In [None]:
from PIL import Image, ImageDraw, ImageFont
from matplotlib.colors import hsv_to_rgb
def draw_detections(image, boxes, labels, scores, classes, scale_factor=(1.0, 1.0)):
    """
    Draw bounding boxes and labels on the image
    
    Args:
        image: PIL image to draw on
        boxes: Detected bounding boxes
        labels: Detected class labels
        scores: Confidence scores
        classes: List of class names
        scale_factor: Scale factor to convert coordinates from model input to original image
    
    Returns:
        annotated_image: Image with annotations
    """
    # Create a copy of the image to draw on
    draw_image = image.copy()
    draw = ImageDraw.Draw(draw_image)
    
    # Try to load a font, use default if not available
    try:
        font = ImageFont.truetype("arial.ttf", 30)
    except IOError:
        font = ImageFont.load_default()
    
    # Generate distinct colors for each class
    colors = {}
    for i in range(len(classes)):
        hue = i / max(1, len(classes) - 1)
        rgb = hsv_to_rgb((hue, 0.7, 0.7))  # Convert HSV to RGB
        colors[i] = (int(rgb[0] * 255), int(rgb[1] * 255), int(rgb[2] * 255))
    
    # Draw each detection
    for i in range(len(boxes)):
        box = boxes[i].tolist()
        
        # Scale box coordinates back to original image size
        x1, y1, x2, y2 = box
        print("Bounding Box:", x1, y1, x2, y2)
        # x1 *= scale_factor[0]
        # y1 *= scale_factor[1]
        # x2 *= scale_factor[0]
        # y2 *= scale_factor[1]
        
        # Ensure box coordinates are integers and within image boundaries
        x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
        
        # Get label and score
        label_idx = int(labels[i].item()) if labels.numel() > 0 else 0
        score = scores[i].item()
        
        # Ensure label_idx is in range
        label_idx = min(label_idx, len(classes) - 1)
        
        # Draw bounding box
        draw.rectangle([(x1, y1), (x2, y2)], outline=colors[label_idx], width=3)
        
        # Prepare and draw label
        class_name = classes[label_idx]
        label_text = f"{class_name}: {score:.2f}"
        print(f'Label: {label_text}')
        
        # Get text size
        try:
            text_size = draw.textbbox((0, 0), label_text, font=font)[2:4]
        except AttributeError:
            # For older PIL versions
            text_size = draw.textsize(label_text, font=font)
        
        # Draw label background
        draw.rectangle(
            [(x1, max(0, y1 - text_size[1] - 4)), (x1 + text_size[0] + 4, y1)],
            fill=colors[label_idx]
        )
        
        # Draw label text
        draw.text((x1 + 2, max(0, y1 - text_size[1] - 2)), label_text, fill="white", font=font)
    
    return draw_image

In [None]:
def visualize_detections(image_path, model_path, classes, input_size, conf_threshold=0.3):
    """
    Main function to load a model and visualize detections
    
    Args:
        image_path (str): Path to the input image
        model_path (str): Path to the saved model
        classes (list): List of class names
        input_size (tuple): Model input size (height, width)
        conf_threshold (float): Confidence threshold for detections
    """
    # Load the model
    model = load_model(model_path, num_classes=len(classes), input_size=input_size)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    print(f"Processing image: {image_path}")
    
    # Preprocess the image
    image_tensor, original_image, scale_factor = preprocess_image(image_path, input_size)
    image_tensor = image_tensor.to(device)
    
    print(f"Input image shape: {image_tensor}")
    print(f"Input image size: {original_image}")
    print(f"Scale image shape: {scale_factor}")
    
    # Detect objects
    try:
        boxes, labels, scores = detect_objects_model(model, image_tensor, conf_threshold)
        print(f"boxes: {boxes}")
        print(f"Detected {len(boxes)} objects")
        
        # Draw detections on image
        annotated_image = draw_detections(original_image, boxes, labels, scores, classes, scale_factor)
         
        # Save the result
        output_path = image_path.replace('.', '_detected.')
        annotated_image.save(output_path)
        print(f"Annotated image saved to {output_path}")

        # Display the result
        plt.figure(figsize=(12, 8))
        plt.imshow(np.array(annotated_image))
        plt.axis('off')
        plt.title('Object Detection Results')
        plt.show()

        return annotated_image
    
    except Exception as e:
        print(f"Error during detection: {e}")
        import traceback
        traceback.print_exc()
        return None

In [None]:
# Replace with your actual values
model_path = 'object_detection_model.pth'
classes = ['human', 'wind/sup-board', 'boat', 'bouy', 'sailboat', 'kayak']
input_size = (224, 224)  # Height, Width

# Example: Process a single image
image_path = 'dataset/working/images/val/a_101.jpg'
result = visualize_detections(image_path, model_path, classes, input_size)