# PyTorch method for car detection

In [3]:
# Import libraries
import os
import shutil
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import yaml
from skimage.io import imread
import imagesize

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import SGD, AdamW
from torch.optim.lr_scheduler import MultiStepLR, CosineAnnealingLR, OneCycleLR
from torch.cuda.amp import autocast, GradScaler
import torchvision
import torchvision.models.detection as detection
from torchvision import transforms
import albumentations as A
from albumentations.pytorch import ToTensorV2

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

from PIL import Image
from IPython.display import Video

In [None]:
class YoloDataset(Dataset):
    def __init__(self, img_dir, label_dir, transform=None, mosaic_prob=0.5, mixup_prob=0.2):
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.img_files = [f for f in os.listdir(img_dir) if f.endswith('.jpg') or f.endswith('.png')]
        self.transform = transform
        self.mosaic_prob = mosaic_prob
        self.mixup_prob = mixup_prob
        
    def __len__(self):
        return len(self.img_files)
    
    def load_image_and_labels(self, idx):
        img_path = os.path.join(self.img_dir, self.img_files[idx])
        label_path = os.path.join(self.label_dir, self.img_files[idx].replace('.jpg', '.txt').replace('.png', '.txt'))
        
        # Load image
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h, w = img.shape[:2]
        
        # Load labels
        boxes = []
        labels = []
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f.readlines():
                    if line.strip():
                        cls, x_c, y_c, width, height = map(float, line.strip().split())
                        # Convert to absolute coordinates
                        x1 = (x_c - width/2) * w
                        y1 = (y_c - height/2) * h
                        x2 = (x_c + width/2) * w
                        y2 = (y_c + height/2) * h
                        boxes.append([x1, y1, x2, y2])
                        labels.append(int(cls) + 1)
        
        return img, boxes, labels
    
    def __getitem__(self, idx):
        img, boxes, labels = self.load_image_and_labels(idx)
        # Apply augmentations
        if self.transform:
            transformed = self.transform(image=img, bboxes=boxes, labels=labels)
            img = transformed['image']
            boxes = transformed['bboxes']
            labels = transformed['labels']
        
        boxes = torch.tensor(boxes, dtype=torch.float32) if boxes else torch.empty((0, 4), dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64) if labels else torch.empty((0,), dtype=torch.int64)
        
        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([idx])
        }
        
        return img, target

In [13]:
def get_augmentation_transforms(training=True):
        if training:
            return A.Compose([
                A.RandomResizedCrop(height=800, width=800, scale=(0.8, 1.0), ratio=(0.75, 1.33)),
                A.HorizontalFlip(p=0.5),
                A.VerticalFlip(p=0.1),
                A.Rotate(limit=15, p=0.3),
                A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
                A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.3),
                A.GaussNoise(var_limit=(10.0, 50.0), p=0.2),
                A.GaussianBlur(blur_limit=3, p=0.1),
                A.Cutout(num_holes=8, max_h_size=32, max_w_size=32, fill_value=0, p=0.2),
                A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                ToTensorV2()
            ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels'], min_visibility=0.3))
        else:
            return A.Compose([
                A.Resize(height=800, width=800),
                A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                ToTensorV2()
            ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

def build_optimized_model(num_classes, backbone='resnet50', pretrained=True):
    if backbone == 'resnet50':
        model = detection.fasterrcnn_resnet50_fpn(pretrained=pretrained)
    elif backbone == 'resnet101':
        model = detection.fasterrcnn_resnet101_fpn(pretrained=pretrained)
    elif backbone == 'mobilenet':
        model = detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=pretrained)
        
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
        
    return model

In [7]:
class EarlyStopping:
    def __init__(self, patience=7, min_delta=0, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.best_loss = None
        self.counter = 0
        self.best_weights = None

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.save_checkpoint(model)
        elif self.best_loss - val_loss > self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            self.save_checkpoint(model)
        else:
            self.counter += 1

        if self.counter >= self.patience:
            if self.restore_best_weights:
                model.load_state_dict(self.best_weights)
            return True
        return False

    def save_checkpoint(self, model):
        self.best_weights = model.state_dict().copy()
        
def collate_fn(batch):
    return tuple(zip(*batch))

In [8]:
from collections import defaultdict
from tqdm import tqdm
def train_one_epoch(model, optimizer, dataloader, device, scaler, epoch, scheduler=None):
    model.train()
    total_loss = 0
    loss_components = defaultdict(float)
    
    progress_bar = tqdm(dataloader, desc=f'Epoch {epoch}')
    
    for batch_idx, (imgs, targets) in enumerate(progress_bar):
        imgs = list(img.to(device) for img in imgs)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        optimizer.zero_grad()
        
        with autocast():
            loss_dict = model(imgs, targets)
            losses = sum(loss for loss in loss_dict.values())
        
        scaler.scale(losses).backward()
        scaler.step(optimizer)
        scaler.update()
        
        total_loss += losses.item()
        
        # Log individual loss components
        for k, v in loss_dict.items():
            loss_components[k] += v.item()
        
        # Update progress bar
        progress_bar.set_postfix({
            'loss': f'{losses.item():.4f}',
            'avg_loss': f'{total_loss/(batch_idx+1):.4f}'
        })
        
        # Step scheduler if it's OneCycleLR
        if scheduler and isinstance(scheduler, OneCycleLR):
            scheduler.step()
    
    avg_loss = total_loss / len(dataloader)
    avg_loss_components = {k: v / len(dataloader) for k, v in loss_components.items()}
    
    return avg_loss, avg_loss_components

In [9]:
def validate_model(model, dataloader, device):
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for imgs, targets in tqdm(dataloader, desc='Validation'):
            imgs = list(img.to(device) for img in imgs)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            with autocast():
                loss_dict = model(imgs, targets)
                losses = sum(loss for loss in loss_dict.values())
            
            total_loss += losses.item()
    
    return total_loss / len(dataloader)

In [10]:
def get_optimizer(model, optimizer_type='sgd', lr=0.005, weight_decay=0.0005):
    if optimizer_type.lower() == 'sgd':
        return SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
    elif optimizer_type.lower() == 'adamw':
        return AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    else:
        raise ValueError(f"Unsupported optimizer: {optimizer_type}")

def get_scheduler(optimizer, scheduler_type='multistep', epochs=100, steps_per_epoch=None):
    if scheduler_type.lower() == 'multistep':
        milestones = [int(epochs * 0.7), int(epochs * 0.9)]
        return MultiStepLR(optimizer, milestones=milestones, gamma=0.1)
    elif scheduler_type.lower() == 'cosine':
        return CosineAnnealingLR(optimizer, T_max=epochs)
    elif scheduler_type.lower() == 'onecycle':
        return OneCycleLR(optimizer, max_lr=0.01, epochs=epochs, steps_per_epoch=steps_per_epoch)
    else:
        return None

In [None]:
def train_model():
    config = {
        'batch_size': 8,
        'num_epochs': 100,
        'num_classes': 2,
        'learning_rate': 0.01,
        'optimizer': 'adamw',
        'scheduler': 'onecycle',
        'backbone': 'resnet50',
        'patience': 15,
        'img_size': 800,
        'device': 'cuda' if torch.cuda.is_available() else 'cpu'
    }
    
    device = torch.device(config['device'])
    print(f"Using device: {device}")
    
    # Data augmentation
    train_transforms = get_augmentation_transforms(training=True)
    val_transforms = get_augmentation_transforms(training=False)
    
    # Datasets
    train_dataset = YoloDataset("data/train/images", "data/train/labels", transform=train_transforms)
    val_dataset = YoloDataset("data/val/images", "data/val/labels", transform=val_transforms)
    
    # Data loaders with optimized settings
    train_loader = DataLoader(
        train_dataset, 
        batch_size=config['batch_size'], 
        shuffle=True, 
        collate_fn=collate_fn,
        num_workers=4,  # Parallel data loading
        pin_memory=True,  # Faster GPU transfer
        persistent_workers=True  # Keep workers alive
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=config['batch_size'], 
        shuffle=False, 
        collate_fn=collate_fn,
        num_workers=4,
        pin_memory=True
    )
    
    # Model
    model = build_optimized_model(config['num_classes'], backbone=config['backbone'])
    model.to(device)
    
    # Optimizer
    optimizer = get_optimizer(model, config['optimizer'], config['learning_rate'])
    
    # Scheduler
    scheduler = get_scheduler(
        optimizer, 
        config['scheduler'], 
        config['num_epochs'], 
        len(train_loader)
    )
    
    # Mixed precision scaler
    scaler = GradScaler()
    
    # Early stopping
    early_stopping = EarlyStopping(patience=config['patience'])
    
    # Training loop
    best_val_loss = float('inf')
    train_losses = []
    val_losses = []
    
    for epoch in range(config['num_epochs']):
        # Training
        train_loss, loss_components = train_one_epoch(
            model, optimizer, train_loader, device, scaler, epoch, scheduler
        )
        
        # Validation
        val_loss = validate_model(model, val_loader, device)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        # Step scheduler (except OneCycleLR which steps during training)
        if scheduler and not isinstance(scheduler, OneCycleLR):
            scheduler.step()
        
        # Logging
        print(f"Epoch {epoch+1}/{config['num_epochs']}:")
        print(f"  Train Loss: {train_loss:.4f}")
        print(f"  Val Loss: {val_loss:.4f}")
        for component, value in loss_components.items():
            print(f"  {component}: {value:.4f}")
        print(f"  LR: {optimizer.param_groups[0]['lr']:.6f}")
        print("-" * 50)
        
        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_loss': val_loss,
                'config': config
            }, 'best_model.pth')
            print(f"New best model saved with val_loss: {val_loss:.4f}")
        
        # Early stopping
        if early_stopping(val_loss, model):
            print(f"Early stopping triggered after {epoch+1} epochs")
            break
    
    print("Training completed!")
    return model, train_losses, val_losses