In [1]:
import torch
import torchvision
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
import pathlib
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
import torch.optim as optim

from PIL import Image
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import Dataset
from torchvision import datasets, transforms, models
from torchvision.transforms import ToTensor
from matplotlib import pyplot as plt

In [2]:
from torchvision.io import read_image
from torch.utils.data import DataLoader, SubsetRandomSampler, random_split
from torch import nn
import torchvision.transforms as transforms
from torch.cuda.amp import autocast

In [None]:
root_dir = "./data"
voc_dataset = torchvision.datasets.VOCDetection(
    root=root_dir,  # Directory where the dataset will be downloaded
    year="2012",  # Choose "2007" or "2012"
    image_set="train",  # Options: "train", "val", "trainval", "test" (2007 has "test")
    download=True,  # Download dataset if not available
    transform=transforms.ToTensor()  # Convert images to tensor
    
)

In [3]:
class VOCDataset():
    def __init__(self, root_dir, image_set="train", transform=None):
        self.root_dir = root_dir
        self.image_set = image_set
        self.transform = transform
        self.image_dir = pathlib.Path(root_dir, "JPEGImages")
        self.annotation_dir = pathlib.Path(root_dir, "Annotations")
        self.split_file = pathlib.Path(root_dir, "ImageSets", "Main", f"{image_set}.txt")

        with open(self.split_file, "r") as f:
            self.image_ids = [line.strip() for line in f.readlines()]

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        image_path = pathlib.Path(self.image_dir, f"{image_id}.jpg")
        ann_path = pathlib.Path(self.annotation_dir, f"{image_id}.xml")

        image = Image.open(image_path).convert("RGB")
        boxes, labels = self.parse_voc_xml(ann_path)

        if self.transform:
            image = self.transform(image)
        
        target = {"boxes": torch.tensor(boxes, dtype=torch.float32),
                  "labels": torch.tensor(labels, dtype=torch.int64)}
        
        return image, target


    def parse_voc_xml(self, xml_path):
        """
        Parse Pascal VOC XML annotation file.
        """
        tree = ET.parse(xml_path)
        root = tree.getroot()

        boxes = []
        labels = []
        class_to_idx= {
            "aeroplane": 0, "bicycle": 1, "bird": 2, "boat": 3, "bottle": 4,
            "bus": 5, "car": 6, "cat": 7, "chair": 8, "cow": 9,
            "diningtable": 10, "dog": 11, "horse": 12, "motorbike": 13, "person": 14,
            "pottedplant": 15, "sheep": 16, "sofa": 17, "train": 18, "tvmonitor": 19
        }

        for obj in root.findall("object"):
            label = obj.find("name").text
            if label not in class_to_idx:
                continue
            labels.append(class_to_idx[label])
            
            bbox = obj.find("bndbox")
            xmin = int(bbox.find("xmin").text)
            ymin = int(bbox.find("ymin").text)
            xmax = int(bbox.find("xmax").text)
            ymax = int(bbox.find("ymax").text)
            boxes.append([xmin, ymin, xmax, ymax])
        
        return boxes, labels



In [4]:
# Define transforms
# TODO: add more complex transformations (rotations / reflections)
transform = transforms.Compose([transforms.ToTensor()])

In [5]:
# Initialize datasets
# TODO: Check that the first 10 xml anotations match the first 10 JPEG images
current_dir = pathlib.Path().resolve()
voc_path = pathlib.Path(current_dir, "data", "VOCdevkit", "VOC2012")

dataset = VOCDataset(voc_path, transform=transform)

In [6]:
# Create split for train and validation
dataset_size = len(dataset)

train_size = int(0.7 * dataset_size)
val_size = int(0.15 * dataset_size)
test_size = dataset_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [7]:
def collate_fn(batch):
    return tuple(zip(*batch))
# Initialize dataloaders
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [8]:
# Load pre-trained model
weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
model = fasterrcnn_resnet50_fpn(weights=weights)
num_classes = 20
# Get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# Replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [45]:
def train_loop(dataloader, model, loss_fn, optimizer, device="cpu"):
    """
    Model training loop
    """
    model.train()
    size = len(dataloader.dataset)
    total_loss = 0

    for images, targets in dataloader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        with autocast(enabled=True):
            loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        losses.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        total_loss += losses.item()
    print(f"Avg Loss: {total_loss/len(dataloader)}")
    # for images, targets in dataloader:
    #     for img in images:
    #         if torch.isnan(img).any() or torch.isinf(img).any():
    #             print("NaN or Inf detected in image")
    #     images = list(img.to(device) for img in images)
    #     targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    #     optimizer.zero_grad()
    #     loss_dict = model(images, targets)
    #     loss = sum(loss for loss in loss_dict.values())
        
    #     loss.backward()
    #     nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    #     optimizer.step()
        
    #     total_loss += loss.item()

    # print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")



In [46]:
def test_loop():
    pass

In [47]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [48]:
# Cuda management
torch.cuda.init()
torch.cuda.synchronize()

In [49]:
# Check for CUDA availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"CUDA Available: {torch.cuda.is_available()}")

CUDA Available: True


In [None]:
# Run training/test loops
num_epochs = 1
for epoch in range(num_epochs):
    print(f"Epoch: [{epoch+1}/{num_epochs}]")
    train_loop(train_dataloader, model, criterion, optimizer, device)

Epoch: [1/1]


  with autocast(enabled=True):


In [None]:
!git clone https://github.com/ultralytics/yolov5  # clone
%cd yolov5
%pip install -qr requirements.txt comet_ml  # install

import torch
import utils
display = utils.notebook_init()  # checks

In [None]:
!python val.py --weights yolov5s.pt --data coco.yaml --img 640 --half

In [None]:
import os
import torch
import torch.utils.data
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.datasets import VOCDetection
from torchvision import transforms
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
from PIL import Image

# Define paths
data_path = "./data"  # Change this to your data directory
output_path = "./model_output"  # Path to save models
os.makedirs(output_path, exist_ok=True)

# Configuration
batch_size = 4
num_epochs = 10
learning_rate = 0.005
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using device: {device}")

# VOC class labels
voc_classes = [
    'background', 'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus',
    'car', 'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse',
    'motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor'
]

# Helper function to transform VOC annotations to the format expected by Faster R-CNN
def transform_voc_target(target):
    boxes = []
    labels = []
    for obj in target['annotation']['object']:
        bbox = obj['bndbox']
        xmin = float(bbox['xmin'])
        ymin = float(bbox['ymin'])
        xmax = float(bbox['xmax'])
        ymax = float(bbox['ymax'])
        boxes.append([xmin, ymin, xmax, ymax])
        
        # Map class name to index
        class_name = obj['name']
        class_idx = voc_classes.index(class_name)
        labels.append(class_idx)
    
    # Convert to tensor
    boxes = torch.as_tensor(boxes, dtype=torch.float32)
    labels = torch.as_tensor(labels, dtype=torch.int64)
    
    return {"boxes": boxes, "labels": labels}

# Custom dataset class that applies transformations and formats targets
class VOCDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, transforms=None):
        self.dataset = dataset
        self.transforms = transforms
        
    def __getitem__(self, idx):
        img, target = self.dataset[idx]
        target = transform_voc_target(target)
        
        if self.transforms is not None:
            img = self.transforms(img)
            
        return img, target
    
    def __len__(self):
        return len(self.dataset)

# Define data transformations
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the VOC2012 dataset
print("Loading datasets...")
train_dataset = VOCDetection(
    root=data_path,
    year="2012",
    image_set="train",
    download=True,
    transform=None,  # We'll apply transforms in our custom dataset
)

val_dataset = VOCDetection(
    root=data_path,
    year="2012",
    image_set="val",
    download=True,
    transform=None,
)

# Create custom datasets with transformations
train_dataset = VOCDataset(train_dataset, transforms=data_transform)
val_dataset = VOCDataset(val_dataset, transforms=data_transform)

# Create data loaders
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=4,
    collate_fn=lambda x: tuple(zip(*x))  # This is needed for detection tasks
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=4,
    collate_fn=lambda x: tuple(zip(*x))
)

print(f"Dataset sizes: Train={len(train_dataset)}, Val={len(val_dataset)}")

# Function to get the Faster R-CNN model
def get_faster_rcnn_model(num_classes):
    # Load pre-trained model
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    model = fasterrcnn_resnet50_fpn(weights=weights)
    
    # Get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model

# Initialize the model
model = get_faster_rcnn_model(len(voc_classes))
model.to(device)

# Set up optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=learning_rate, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training function
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    total_loss = 0
    
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        total_loss += losses.item()
    
    return total_loss / len(data_loader)

# Validation function
def evaluate(model, data_loader, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for images, targets in data_loader:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            
            total_loss += losses.item()
    
    return total_loss / len(data_loader)

# Visualization function
def visualize_predictions(model, dataset, idx=0, threshold=0.5):
    model.eval()
    img, _ = dataset[idx]
    
    # Convert image for model prediction
    with torch.no_grad():
        prediction = model([img.to(device)])[0]
    
    # Convert back for visualization
    img = img.cpu().permute(1, 2, 0).numpy()
    # Denormalize
    img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
    img = np.clip(img, 0, 1)
    
    # Create figure and axes
    fig, ax = plt.subplots(1, figsize=(12, 8))
    ax.imshow(img)
    
    # Filter detections with score above threshold
    boxes = prediction['boxes'].cpu().numpy()
    scores = prediction['scores'].cpu().numpy()
    labels = prediction['labels'].cpu().numpy()
    
    for box, score, label in zip(boxes, scores, labels):
        if score > threshold:
            # Create rectangle
            rect = patches.Rectangle(
                (box[0], box[1]), box[2] - box[0], box[3] - box[1],
                linewidth=2, edgecolor='r', facecolor='none'
            )
            # Add rectangle to plot
            ax.add_patch(rect)
            
            # Add label and score
            class_name = voc_classes[label]
            ax.text(
                box[0], box[1] - 5, f'{class_name}: {score:.2f}',
                color='white', bbox=dict(facecolor='red', alpha=0.5)
            )
    
    plt.title('Faster R-CNN Detection Result')
    plt.axis('off')
    plt.savefig(os.path.join(output_path, 'detection_result.png'), bbox_inches='tight')
    plt.close()
    print(f"Saved visualization to {os.path.join(output_path, 'detection_result.png')}")

# Training loop
print("Starting training...")
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    # Train for one epoch
    train_loss = train_one_epoch(model, optimizer, train_loader, device)
    train_losses.append(train_loss)
    
    # Update learning rate
    lr_scheduler.step()
    
    # Evaluate on validation set
    val_loss = evaluate(model, val_loader, device)
    val_losses.append(val_loss)
    
    print(f"Epoch {epoch+1}/{num_epochs}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}")
    
    # Save model checkpoint
    checkpoint_path = os.path.join(output_path, f'faster_rcnn_epoch_{epoch+1}.pth')
    torch.save({
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': lr_scheduler.state_dict(),
        'train_loss': train_loss,
        'val_loss': val_loss
    }, checkpoint_path)
    print(f"Saved checkpoint to {checkpoint_path}")

# Save final model
final_model_path = os.path.join(output_path, 'faster_rcnn_final.pth')
torch.save(model.state_dict(), final_model_path)
print(f"Training complete! Final model saved to {final_model_path}")

# Plot loss curves
plt.figure(figsize=(10, 6))
plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(output_path, 'loss_curves.png'))
plt.close()
print(f"Saved loss curves to {os.path.join(output_path, 'loss_curves.png')}")

# Test the model on one example
print("Running inference on a sample image...")
try:
    visualize_predictions(model, val_dataset, idx=0)
    print("Done!")
except Exception as e:
    print(f"Error during visualization: {e}")

# Function to load and run inference on a new image
def run_inference_on_image(model, image_path, threshold=0.5):
    # Load and transform image
    image = Image.open(image_path).convert("RGB")
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image_tensor = transform(image)
    
    # Run inference
    model.eval()
    with torch.no_grad():
        prediction = model([image_tensor.to(device)])[0]
    
    # Process results
    boxes = prediction['boxes'].cpu().numpy()
    scores = prediction['scores'].cpu().numpy()
    labels = prediction['labels'].cpu().numpy()
    
    # Keep only predictions above threshold
    keep = scores > threshold
    boxes = boxes[keep]
    scores = scores[keep]
    labels = labels[keep]
    
    # Get class names
    class_names = [voc_classes[label] for label in labels]
    
    return {
        'boxes': boxes,
        'scores': scores,
        'class_names': class_names
    }

print("\nTraining and evaluation complete!")
print("To use the trained model for inference on a new image:")
print("results = run_inference_on_image(model, 'path/to/your/image.jpg')")