In [1]:
import os
import torch
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, recall_score, average_precision_score
import random
import warnings
warnings.filterwarnings("ignore")

In [None]:
import os
import torch

# Configuration parameters
class Config:
    # Path configuration
    DATA_ROOT = "D:/Code_pytorch/Jintao/hand detection"
    TRAIN_IMAGES = os.path.join(DATA_ROOT, "training_dataset/training_dataset/training_data/images")
    TRAIN_LABELS = os.path.join(DATA_ROOT, "labels_fast_rcnn")
    TEST_IMAGES = os.path.join(DATA_ROOT, "training_dataset/training_dataset/training_data/images")
    TEST_LABELS = os.path.join(DATA_ROOT, "labels_fast_rcnn")
    OUTPUT_DIR = os.path.join(DATA_ROOT, "output")
    MODEL_SAVE_PATH = os.path.join(OUTPUT_DIR, "faster_rcnn_model.pth")
    
    # Training parameters
    NUM_CLASSES = 2  # Background + number of target classes
    BATCH_SIZE = 10
    NUM_EPOCHS = 20
    LEARNING_RATE = 0.005
    MOMENTUM = 0.9
    WEIGHT_DECAY = 0.0005
    
    # Device configuration
    DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [None]:

# Custom dataset class
class HandDataset(Dataset):
    def __init__(self, image_dir, label_dir, train=False, transforms=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transforms = transforms
        self.train = train
        self.image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        # Load image
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        original_width, original_height = image.size
        
        # Resize image to 112x112
        new_size = (112, 112)
        image = F.resize(image, new_size)
        
        # Calculate scaling factor
        width_scale = new_size[0] / original_width
        height_scale = new_size[1] / original_height
        
        # Load labels
        label_path = os.path.join(self.label_dir, os.path.splitext(img_name)[0] + ".txt")
        boxes = []
        labels = []
        
        with open(label_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) != 5:
                    continue
                
                class_id = int(parts[0])
                x_min = float(parts[1])
                y_min = float(parts[2])
                x_max = float(parts[3])
                y_max = float(parts[4])
                
                # Scale coordinates
                x_min = x_min * width_scale
                y_min = y_min * height_scale
                x_max = x_max * width_scale
                y_max = y_max * height_scale
                
                boxes.append([x_min, y_min, x_max, y_max])
                labels.append(class_id + 1)  # 0 reserved for background

        # Convert to tensors
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        # Data augmentation: random horizontal flip
        if self.train and random.random() < 0.5:
            image = F.hflip(image)
            boxes[:, [0, 2]] = 112 - boxes[:, [2, 0]]

        image = F.to_tensor(image)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([idx]),
            "area": (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]),
            "iscrowd": torch.zeros((len(boxes),), dtype=torch.int64)
        }

        if self.transforms:
            image = self.transforms(image)

        return image, target


In [None]:
# Create model
def create_model(num_classes):
    # Load the ResNet-50 backbone pretrained on ImageNet
    backbone = torchvision.models.resnet50(pretrained=True)
    backbone = torch.nn.Sequential(*list(backbone.children())[:-2])  # Remove the fully connected layers
    backbone.out_channels = 2048  # Set the output channels to match ResNet-50's feature map output
    
    # Define anchor generator
    anchor_generator = AnchorGenerator(
        sizes=((16, 32, 64, 128, 256),),  # Define anchor box sizes
        aspect_ratios=((0.5, 1.0, 2.0),)  # Define aspect ratios
    )  
    
    # Define ROI (Region of Interest) Pooling
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0'],  # Feature map name (Faster R-CNN uses only one level of feature map)
        output_size=7,  # Output size for the pooled feature map
        sampling_ratio=2  # Sampling ratio for RoIAlign
    )
    
    # Construct Faster R-CNN model
    model = FasterRCNN(
        backbone,
        num_classes=num_classes,  # Number of object classes (including background)
        rpn_anchor_generator=anchor_generator,  # Use the defined anchor generator
        box_roi_pool=roi_pooler  # Use the defined ROI pooling layer
    )
    
    return model


In [None]:
# IoU (Intersection over Union) computation function
def calculate_iou(boxA, boxB):
    # Compute the coordinates of the intersection rectangle
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    
    # Compute the area of intersection
    interArea = max(0, xB - xA) * max(0, yB - yA)
    
    # Compute the area of both bounding boxes
    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
    
    # Compute the union area
    unionArea = boxAArea + boxBArea - interArea
    
    # Compute the IoU (return 0 if unionArea is zero to avoid division by zero)
    return interArea / unionArea if unionArea != 0 else 0


In [None]:
# Training function (with loss tracking)
def train_model(model, train_loader, optimizer, lr_scheduler, num_epochs):
    model.train()
    train_losses = []  # Store loss values for each epoch
    
    for epoch in range(num_epochs):
        epoch_loss = 0.0  # Initialize epoch loss
        
        for images, targets in train_loader:
            # Move images and targets to the configured device (GPU/CPU)
            images = list(image.to(Config.DEVICE) for image in images)
            targets = [{k: v.to(Config.DEVICE) for k, v in t.items()} for t in targets]

            # Compute loss
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            # Backpropagation and optimization
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()
            
            epoch_loss += losses.item()
        
        # Compute average loss for the epoch
        epoch_loss /= len(train_loader)
        train_losses.append(epoch_loss)
        
        # Update learning rate scheduler
        lr_scheduler.step()
        
        print(f"Epoch {epoch+1}/{num_epochs} Loss: {epoch_loss:.4f}")
    
    return train_losses  # Return recorded losses for analysis


In [None]:
# Evaluation function (improved metric computation)
def evaluate(model, test_loader):
    model.eval()
    all_preds = []  # Store all predictions
    all_targets = []  # Store all ground truth targets
    
    with torch.no_grad():
        for images, targets in test_loader:
            # Move images to the configured device (GPU/CPU)
            images = list(img.to(Config.DEVICE) for img in images)
            outputs = model(images)
            
            for i, output in enumerate(outputs):
                # Extract predicted bounding boxes, labels, and confidence scores
                pred_boxes = output['boxes'].cpu().numpy()
                pred_labels = output['labels'].cpu().numpy()
                pred_scores = output['scores'].cpu().numpy()
                
                # Extract ground truth bounding boxes and labels
                target_boxes = targets[i]['boxes'].cpu().numpy()
                target_labels = targets[i]['labels'].cpu().numpy()
                
                # Store predictions and ground truths for metric computation
                all_preds.append((pred_boxes, pred_labels, pred_scores))
                all_targets.append((target_boxes, target_labels))
    
    # Compute evaluation metrics
    precision, recall, ap = calculate_metrics(all_preds, all_targets)
    print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, AP: {ap:.4f}")
    
    return precision, recall, ap  # Return computed metrics


In [None]:
import numpy as np

# Compute evaluation metrics
def calculate_metrics(preds, targets, iou_threshold=0.5):
    scores_list = []
    tp_list = []
    fp_list = []
    total_gt = 0  # Total number of ground truth boxes
    
    for (pred_boxes, pred_labels, pred_scores), (gt_boxes, gt_labels) in zip(preds, targets):
        # Process only class 1 predictions
        valid_mask = pred_labels == 1
        pred_boxes = pred_boxes[valid_mask]
        pred_scores = pred_scores[valid_mask]
        
        # Sort predictions by confidence score in descending order
        sorted_indices = np.argsort(-pred_scores)
        pred_boxes = pred_boxes[sorted_indices]
        pred_scores = pred_scores[sorted_indices]
        
        # Initialize matching status
        gt_matched = np.zeros(len(gt_boxes), dtype=bool)
        tp = np.zeros(len(pred_boxes), dtype=int)
        fp = np.zeros(len(pred_boxes), dtype=int)
        
        for i, pred_box in enumerate(pred_boxes):
            best_iou = 0.0
            best_gt = -1
            
            for j, gt_box in enumerate(gt_boxes):
                if gt_matched[j]:  # Skip already matched ground truth boxes
                    continue
                iou = calculate_iou(pred_box, gt_box)
                if iou > best_iou:
                    best_iou = iou
                    best_gt = j
            
            # Match prediction with ground truth if IoU is above threshold
            if best_iou >= iou_threshold and best_gt != -1:
                tp[i] = 1
                gt_matched[best_gt] = True  # Mark ground truth as matched
            else:
                fp[i] = 1  # False positive
        
        scores_list.extend(pred_scores)
        tp_list.extend(tp)
        fp_list.extend(fp)
        total_gt += len(gt_boxes)
    
    # Convert lists to arrays and sort by confidence score
    scores = np.array(scores_list)
    tp = np.array(tp_list)
    fp = np.array(fp_list)
    
    indices = np.argsort(-scores)
    tp = tp[indices]
    fp = fp[indices]
    
    # Compute cumulative metrics
    cum_tp = np.cumsum(tp)
    cum_fp = np.cumsum(fp)
    
    precision = cum_tp / (cum_tp + cum_fp + 1e-6)  # Avoid division by zero
    recall = cum_tp / (total_gt + 1e-6)
    
    # Compute mAP (PASCAL VOC style)
    ap = 0.0
    for t in np.arange(0., 1.1, 0.1):
        mask = recall >= t
        if np.any(mask):
            ap += np.max(precision[mask]) / 11  # Average precision over recall levels
    
    # Final precision and recall values
    final_precision = cum_tp[-1] / (cum_tp[-1] + cum_fp[-1]) if (cum_tp[-1] + cum_fp[-1]) > 0 else 0
    final_recall = cum_tp[-1] / total_gt if total_gt > 0 else 0
    
    return final_precision, final_recall, ap


In [None]:
# Visualize prediction results
def visualize_predictions(model, test_loader, num_images=3):
    model.eval()
    images, targets = next(iter(test_loader))  # Get a batch of test images
    
    with torch.no_grad():
        # Move images to the configured device (GPU/CPU)
        images = list(img.to(Config.DEVICE) for img in images)
        outputs = model(images)  # Get model predictions
    
    for i in range(min(num_images, len(images))):
        # Convert tensor image to NumPy array
        img = images[i].cpu().permute(1, 2, 0).numpy()
        img = (img * 255).astype(np.uint8)  # Convert to 8-bit format
        img = Image.fromarray(img)  # Convert to PIL image
        
        draw = ImageDraw.Draw(img)
        
        # Ground truth bounding boxes (Green)
        for box in targets[i]['boxes']:
            draw.rectangle(box.tolist(), outline=(0, 255, 0), width=2)
        
        # Predicted bounding boxes (Red)
        for box in outputs[i]['boxes'].cpu().numpy():
            draw.rectangle(box.tolist(), outline=(255, 0, 0), width=2)
        
        # Display image
        plt.figure(figsize=(12, 8))
        plt.imshow(img)
        plt.axis('off')
        plt.title(f"Predictions (Red) vs Ground Truth (Green)")
        plt.show()


In [None]:
# Plot loss curve
def plot_loss_curve(train_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss')  # Plot the training loss
    plt.xlabel('Epoch')  # X-axis label
    plt.ylabel('Loss')  # Y-axis label
    plt.title('Training Loss Curve')  # Title of the plot
    plt.legend()  # Show legend
    plt.savefig(os.path.join(Config.OUTPUT_DIR, 'loss_curve.png'))  # Save the loss curve as an image
    plt.show()  # Display the plot


In [None]:
# Main program
def main():
    os.makedirs(Config.OUTPUT_DIR, exist_ok=True)  # Create output directory if it doesn't exist
    
    # Prepare datasets
    train_dataset = HandDataset(
        Config.TRAIN_IMAGES,
        Config.TRAIN_LABELS,
        train=True
    )
    
    test_dataset = HandDataset(
        Config.TEST_IMAGES,
        Config.TEST_LABELS
    )
    
    # Create data loaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=Config.BATCH_SIZE,
        shuffle=True,
        collate_fn=lambda x: tuple(zip(*x)))  # Handle variable-sized targets
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=Config.BATCH_SIZE,
        shuffle=False,
        collate_fn=lambda x: tuple(zip(*x)))
    
    # Initialize the model
    model = create_model(Config.NUM_CLASSES)
    model.to(Config.DEVICE)
    
    # Define optimizer
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(
        params,
        lr=Config.LEARNING_RATE,
        momentum=Config.MOMENTUM,
        weight_decay=Config.WEIGHT_DECAY
    )
    
    # Learning rate scheduler
    lr_scheduler = torch.optim.lr_scheduler.StepLR(
        optimizer,
        step_size=5,
        gamma=0.1
    )
    
    # Train the model
    print("Starting training...")
    train_losses = train_model(model, train_loader, optimizer, lr_scheduler, Config.NUM_EPOCHS)
    
    # Plot the loss curve
    plot_loss_curve(train_losses)
    
    # Save the trained model
    torch.save(model.state_dict(), Config.MODEL_SAVE_PATH)
    print(f"Model saved at {Config.MODEL_SAVE_PATH}")
    
    # Evaluate the model
    print("\nEvaluating on the test set...")
    precision, recall, ap = evaluate(model, test_loader)
    print(f"Final evaluation results:\nPrecision: {precision:.4f}, Recall: {recall:.4f}, AP: {ap:.4f}")
    
    # Visualize predictions
    print("\nGenerating prediction visualizations...")
    visualize_predictions(model, test_loader)

if __name__ == "__main__":
    main()
