### N.B. Run with a GPU device active 

# Modelli YOLO

Import di librerie necessarie

In [None]:
!pip install ultralytics

# YOLOv8m 

In [None]:
from ultralytics import YOLO

model = YOLO('yolov8m.pt')

model.train(
    data = "/kaggle/input/new-dataset-for-ml-project-last-version/new-dataset-for-ml-project-last-version/data.yaml",
    imgsz = (640,640),
    batch = 32,
    optimizer = "Adam",
    lr0= 1e-3
)

In [None]:
results = model.val(data = "/kaggle/input/test-set-for-ml-project/test-set-for-ml-project/data.yaml", split = "test")

## Validation

In [None]:
results.results_dict

# YOLOv10n

In [None]:
from ultralytics import YOLO

model = YOLO('yolov10n.pt')

model.train(
    data = "/kaggle/input/new-dataset-for-ml-project-last-version/new-dataset-for-ml-project-last-version/data.yaml",
    imgsz = (640,640),
    batch = 32,
    optimizer = "Adam",
    lr0= 1e-3
)

## Validation

In [None]:
results = model.val(data = "/kaggle/input/test-set-for-ml-project/test-set-for-ml-project/data.yaml", split = "test")

In [None]:
results.results_dict

# YOLOv11s

In [None]:
from ultralytics import YOLO

model = YOLO('/kaggle/input/yolo11/pytorch/default/1/yolo11s.pt') # bisogna importare i modelli Ultralytics dal notebook ufficiale di riferimento per YOLOv11 qui presente sulla piattaforma

model.train(
    data = "/kaggle/input/new-dataset-for-ml-project-last-version/new-dataset-for-ml-project-last-version/data.yaml",
    imgsz = (640,640),
    batch = 32,
    optimizer = "Adam",
    lr0= 1e-3
)

## Validation

In [None]:
results = model.val(data = "/kaggle/input/test-set-for-ml-project/test-set-for-ml-project/data.yaml", split = "test")

In [None]:
results.results_dict

# Faster R-CNN con backbone ResNet50

Import di librerie necessarie

In [None]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image
import cv2


import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

from pycocotools.coco import COCO

# Progress bar
from tqdm.auto import tqdm

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

INPUT_PATH = "/kaggle/input/new-dataset-for-ml-project-coco"
WORKING_PATH = "/kaggle/working"

In [None]:
class COCOCustomDataset(Dataset):

    def __init__(self, root_dir, annotation_file, transforms=None):

        self.root_dir = root_dir
        self.transforms = transforms
        
        # Load COCO annotations
        self.coco = COCO(annotation_file)
        
        # Get all image ids
        self.image_ids = list(self.coco.imgs.keys())
        
        # Get category information
        self.category_ids = list(self.coco.cats.keys())
        self.category_names = [self.coco.cats[cat_id]['name'] for cat_id in self.category_ids]
    
    def __len__(self):
        return len(self.image_ids)
    
    def __getitem__(self, idx):
        # Get image info
        img_id = self.image_ids[idx]
        img_info = self.coco.imgs[img_id]
        img_path = os.path.join(self.root_dir, img_info['file_name'])
        
        # Load image
        image = Image.open(img_path).convert("RGB")
        image = np.array(image)
        
        # Get annotations for this image
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)
        
        boxes = []
        labels = []
        areas = []
        iscrowd = []
        
        for ann in anns:
            # COCO bbox format: [x, y, width, height]
            # Convert to [x1, y1, x2, y2]
            x, y, w, h = ann['bbox']
            boxes.append([x, y, x + w, y + h])
            
            # Map category_id to label (0-indexed for model)
            # For pedestrian detection: map all to class 1 (person)
            labels.append(1)  # 1 for person, 0 is background
            
            areas.append(ann['area'])
            iscrowd.append(ann['iscrowd'])
        
        # Convert to tensors
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        areas = torch.as_tensor(areas, dtype=torch.float32)
        iscrowd = torch.as_tensor(iscrowd, dtype=torch.int64)
        
        # Create target dictionary
        target = {
            "boxes": boxes,
            "labels": labels,
            "area": areas,
            "iscrowd": iscrowd,
            "image_id": torch.tensor([img_id])
        }
        
        # Apply transforms
        if self.transforms:
            image, target = self.transforms(image, target)
        else:
            image = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        
        return image, target


def collate_fn(batch):
    """Custom collate function for DataLoader"""
    images, targets = tuple(zip(*batch))
    return list(images), list(targets)

In [None]:
# Data transforms and augmentation (references/detection/transforms.py)
class Compose:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target


class ToTensor:
    def __call__(self, image, target):
        image = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        return image, target


class Normalize:
    def __init__(self, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
        self.mean = mean
        self.std = std

    def __call__(self, image, target):
        for i in range(3):
            image[i] = (image[i] - self.mean[i]) / self.std[i]
        return image, target


class RandomHorizontalFlip:
    def __init__(self, prob=0.5):
        self.prob = prob

    def __call__(self, image, target):
        if torch.rand(1) < self.prob:
            height, width = image.shape[-2:]
            image = image.flip(-1)
            bbox = target["boxes"]
            bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
            target["boxes"] = bbox
        return image, target


def get_transforms(train = True):
    transforms_list = []
    transforms_list.append(ToTensor())
    
    if train:
        transforms_list.append(RandomHorizontalFlip(0.5))
    
    transforms_list.append(Normalize())
    return Compose(transforms_list)

In [None]:
DATASET_NAME = "new-dataset-for-ml-project-coco" 

train_dir = f"{INPUT_PATH}/{DATASET_NAME}/train"
valid_dir = f"{INPUT_PATH}/{DATASET_NAME}/valid"
test_dir = f"/kaggle/input/test-set-for-ml-project-coco/test"


train_dataset = COCOCustomDataset(
    root_dir=train_dir,
    annotation_file=os.path.join(train_dir, "_annotations.coco.json"),
    transforms=get_transforms(train=True)
)
    
val_dataset = COCOCustomDataset(
    root_dir=valid_dir,
    annotation_file=os.path.join(valid_dir, "_annotations.coco.json"),
    transforms=get_transforms(train=False)
)
    
test_dataset = COCOCustomDataset(
    root_dir=test_dir,
    annotation_file=os.path.join(test_dir, "_annotations.coco.json"),
    transforms=get_transforms(train=False)
)

In [None]:
BATCH_SIZE = 4 
NUM_WORKERS = 2  

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    collate_fn=collate_fn
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    collate_fn=collate_fn
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    collate_fn=collate_fn
)

In [None]:
# Initialize the model
print("Initializing Faster R-CNN with ResNet50 backbone...")


num_classes = 2 # person + background
        
model = fasterrcnn_resnet50_fpn(
    pretrained = True,
    progress = True,
    pretrained_backbone = True
)
        
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

model.to(device)

In [None]:
# Configure training parameters
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.0005
MOMENTUM = 0.9
NUM_EPOCHS = 30 
STEP_SIZE = 3
GAMMA = 0.1

# Setup optimizer and scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(
    params, 
    lr = LEARNING_RATE,
    momentum = MOMENTUM, 
    weight_decay = WEIGHT_DECAY
)

lr_scheduler = optim.lr_scheduler.StepLR(
    optimizer,
    step_size = STEP_SIZE,
    gamma = GAMMA
)

In [None]:
# Training functions (reference/detection/engine.py)
def train_one_epoch(model, optimizer, data_loader, device, epoch):
    """Train for one epoch"""
    model.train()
    total_loss = 0
    num_batches = len(data_loader)
    
    progress_bar = tqdm(data_loader, desc=f"Training Epoch {epoch+1}")
    
    for batch_idx, (images, targets) in enumerate(progress_bar):
        # Move data to device
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        # Forward pass
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        
        # Backward pass
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        total_loss += losses.item()
        
        # Update progress bar
        progress_bar.set_postfix({
            'Loss': f'{losses.item():.4f}',
            'Avg Loss': f'{total_loss/(batch_idx+1):.4f}'
        })
    
    return total_loss / num_batches


def validate_one_epoch(model, data_loader, device, epoch):
    """Validate for one epoch"""
    model.train()   # va lasciato in train poiché faster rcnn ritorna il dizionario con le loss solo in modalità train, in modalità eval restituisce direttamente la lista di predizioni
    total_loss = 0
    num_batches = len(data_loader)
    
    with torch.no_grad():
        progress_bar = tqdm(data_loader, desc=f"Validation Epoch {epoch+1}")
        
        for batch_idx, (images, targets) in enumerate(progress_bar):
            # Move data to device
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            # Forward pass
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            
            total_loss += losses.item()
            
            # Update progress bar
            progress_bar.set_postfix({
                'Loss': f'{losses.item():.4f}',
                'Avg Loss': f'{total_loss/(batch_idx+1):.4f}'
            })
    
    return total_loss / num_batches

In [None]:
# Main training loop
print("Starting training...")
print("=" * 60)

# Training history
train_losses = []
val_losses = []
best_val_loss = float('inf')

# Create output directory
os.makedirs(f"{WORKING_PATH}/checkpoints", exist_ok=True)

for epoch in range(NUM_EPOCHS):
    print(f"\n Epoch {epoch+1}/{NUM_EPOCHS}")
    print("-" * 40)
    
    # Training phase
    train_loss = train_one_epoch(model, optimizer, train_loader, device, epoch)
    train_losses.append(train_loss)
    
    # Validation phase
    val_loss = validate_one_epoch(model, val_loader, device, epoch)
    val_losses.append(val_loss)
    
    # Update learning rate
    lr_scheduler.step()
    current_lr = optimizer.param_groups[0]['lr']
    
    # Print epoch results
    print(f"\n Epoch {epoch+1} Results:")
    print(f" Train Loss: {train_loss:.4f}")
    print(f" Val Loss: {val_loss:.4f}")
    print(f" Learning Rate: {current_lr:.6f}")
    
    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), f"{WORKING_PATH}/checkpoints/best_model.pth")
    
# Save final model
torch.save(model.state_dict(), f"{WORKING_PATH}/checkpoints/final_model.pth")

In [None]:
def evaluate_model(model, data_loader, device, confidence_threshold = 0.5):
    model.eval()
    
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for images, targets in tqdm(data_loader, desc="Evaluating"):
            images = [img.to(device) for img in images]
            
            predictions = model(images)
            
            # Process predictions and targets
            for pred, target in zip(predictions, targets):
                scores = pred['scores']
                keep = scores >= confidence_threshold
                
                pred_filtered = {
                    'boxes': pred['boxes'][keep].cpu().numpy(),
                    'scores': pred['scores'][keep].cpu().numpy(),
                    'labels': pred['labels'][keep].cpu().numpy()
                }
                
                target_processed = {
                    'boxes': target['boxes'].cpu().numpy(),
                    'labels': target['labels'].cpu().numpy()
                }
                
                all_predictions.append(pred_filtered)
                all_targets.append(target_processed)
    
    return all_predictions, all_targets


def calculate_iou(box1, box2):
    # Calculate intersection coordinates
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    # Calculate intersection area
    if x2 <= x1 or y2 <= y1:
        return 0.0
    
    intersection = (x2 - x1) * (y2 - y1)
    
    # Calculate union area
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union = area1 + area2 - intersection
    
    return intersection / union if union > 0 else 0.0


def calculate_metrics(predictions, targets, iou_threshold = 0.5):
    all_pred_boxes = []
    all_pred_scores = []
    all_gt_boxes = []
    
    # Collect all predictions and ground truths
    for pred, target in zip(predictions, targets):
        all_pred_boxes.extend(pred['boxes'])
        all_pred_scores.extend(pred['scores'])
        all_gt_boxes.extend(target['boxes'])
    
    # Sort predictions by confidence score
    sorted_indices = np.argsort(all_pred_scores)[::-1]
    sorted_pred_boxes = [all_pred_boxes[i] for i in sorted_indices]
    
    # Calculate matches
    tp = 0  # True positives
    fp = 0  # False positives
    matched_gt = set()  # Keep track of matched ground truth boxes
    
    for pred_box in sorted_pred_boxes:
        best_iou = 0.0
        best_gt_idx = -1
        
        for gt_idx, gt_box in enumerate(all_gt_boxes):
            if gt_idx in matched_gt:
                continue
            
            iou = calculate_iou(pred_box, gt_box)
            if iou > best_iou:
                best_iou = iou
                best_gt_idx = gt_idx
        
        if best_iou >= iou_threshold:
            tp += 1
            matched_gt.add(best_gt_idx)
        else:
            fp += 1
    
    fn = len(all_gt_boxes) - tp  # False negatives

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    
    return {
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'true_positives': tp,
        'false_positives': fp,
        'false_negatives': fn,
        'total_predictions': len(all_pred_boxes),
        'total_ground_truth': len(all_gt_boxes)
    }

## Validation

In [None]:
# Evaluate the trained model
print("Evaluating model on test set...")
print("=" * 50)

# Load best model
model.load_state_dict(torch.load(f"{WORKING_PATH}/checkpoints/best_model.pth", map_location = device))

# Evaluate on test set
test_predictions, test_targets = evaluate_model(model, test_loader, device, confidence_threshold=0.5)

# Calculate metrics
test_metrics = calculate_metrics(test_predictions, test_targets, iou_threshold=0.5)

print("\nTest Set Evaluation Results:")
print("-" * 30)
print(f" Precision: {test_metrics['precision']:.4f}")
print(f" Recall: {test_metrics['recall']:.4f}")
print(f" F1-Score: {test_metrics['f1_score']:.4f}")
print(f" True Positives: {test_metrics['true_positives']}")
print(f" False Positives: {test_metrics['false_positives']}")
print(f" False Negatives: {test_metrics['false_negatives']}")
print(f" Total Predictions: {test_metrics['total_predictions']}")
print(f" Total Ground Truth: {test_metrics['total_ground_truth']}")

# Also evaluate on validation set for comparison
print("\nEvaluating on validation set for comparison...")
val_predictions, val_targets = evaluate_model(model, val_loader, device, confidence_threshold=0.5)
val_metrics = calculate_metrics(val_predictions, val_targets, iou_threshold=0.5)

print("\nValidation Set Evaluation Results:")
print("-" * 30)
print(f" Precision: {val_metrics['precision']:.4f}")
print(f" Recall: {val_metrics['recall']:.4f}")
print(f" F1-Score: {val_metrics['f1_score']:.4f}")
print(f" True Positives: {val_metrics['true_positives']}")
print(f" False Positives: {val_metrics['false_positives']}")
print(f" False Negatives: {val_metrics['false_negatives']}")
print(f" Total Predictions: {val_metrics['total_predictions']}")
print(f" Total Ground Truth: {val_metrics['total_ground_truth']}")