# Faster R-CNN 完整实现

## 目录
1. 导入库和数据准备
2. 骨干网络 (ResNet Backbone)
3. 区域建议网络 (RPN)
4. ROI池化和检测头
5. Faster R-CNN完整模型
6. 训练和推理


In [None]:
# ============================================================
# 第1部分：导入必要的库
# ============================================================

# PyTorch核心库 - 深度学习的基础框架
import torch

# nn模块包含神经网络的各种层(卷积、全连接等)
import torch.nn as nn

# F模块包含函数式API(激活函数、损失函数等)
import torch.nn.functional as F

# 数据加载工具
from torch.utils.data import DataLoader, Dataset

# torchvision是PyTorch的计算机视觉工具库
import torchvision
from torchvision import transforms

# 目标检测专用操作：非极大值抑制(NMS)和ROI对齐
from torchvision.ops import nms, roi_align

# 数值计算库
import numpy as np

# 绘图库
import matplotlib.pyplot as plt

# 检测并设置计算设备(GPU优先)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")


## 1. 数据集准备

将CIFAR-10分类数据集适配为目标检测格式。

In [None]:
# ============================================================
# CIFAR-10目标检测数据集适配器
# 将分类数据集转换为检测格式(为每张图创建边界框)
# ============================================================

class CIFAR10Detection(Dataset):
    """CIFAR-10检测数据集包装类
    
    CIFAR-10原本是分类数据集，这里为每张图创建覆盖整图的边界框
    用于演示Faster R-CNN的工作流程
    """
    
    def __init__(self, root, train=True, transform=None):
        """初始化数据集
        
        Args:
            root: 数据集根目录
            train: True为训练集，False为测试集
            transform: 图像变换操作
        """
        # 加载CIFAR-10数据集(自动下载)
        self.cifar10 = torchvision.datasets.CIFAR10(
            root=root, 
            train=train, 
            download=True, 
            transform=transform
        )
        # 10个类别名称
        self.classes = self.cifar10.classes
    
    def __len__(self):
        """返回数据集大小"""
        return len(self.cifar10)
    
    def __getitem__(self, idx):
        """获取一个样本
        
        Returns:
            img: 图像张量 [3, H, W]
            target: 包含boxes和labels的字典
        """
        # 获取图像和类别标签
        img, label = self.cifar10[idx]
        
        # 创建边界框[x1, y1, x2, y2]，覆盖整个32x32图像
        boxes = torch.tensor([[0, 0, 32, 32]], dtype=torch.float32)
        
        # 标签+1，因为0保留给背景类
        labels = torch.tensor([label + 1], dtype=torch.int64)
        
        # 构建目标字典
        target = {
            "boxes": boxes,    # 边界框坐标
            "labels": labels   # 类别标签(1-10)
        }
        
        return img, target


# 定义图像变换：转换为Tensor
transform = transforms.Compose([
    transforms.ToTensor(),  # [0,255] -> [0,1]
])

# 创建数据集
train_dataset = CIFAR10Detection(root='./data', train=True, transform=transform)
test_dataset = CIFAR10Detection(root='./data', train=False, transform=transform)

# 创建数据加载器
# collate_fn处理变长数据(不同图像可能有不同数量的目标)
train_loader = DataLoader(
    train_dataset, 
    batch_size=2, 
    shuffle=True, 
    collate_fn=lambda x: tuple(zip(*x))
)
test_loader = DataLoader(
    test_dataset, 
    batch_size=2, 
    shuffle=False, 
    collate_fn=lambda x: tuple(zip(*x))
)

print(f"训练集大小: {len(train_dataset)}")
print(f"测试集大小: {len(test_dataset)}")
print(f"类别: {train_dataset.classes}")


## 2. 骨干网络

ResNet-50作为特征提取骨干网络。

In [None]:
# ============================================================
# ResNet Bottleneck块 - ResNet的基本构建单元
# 使用1x1->3x3->1x1卷积结构，降低计算量
# ============================================================

class Bottleneck(nn.Module):
    """ResNet Bottleneck结构
    
    结构: 1x1卷积(降维) -> 3x3卷积(特征提取) -> 1x1卷积(升维)
    expansion=4表示输出通道是中间通道的4倍
    """
    expansion = 4  # 通道扩展系数
    
    def __init__(self, in_channels, out_channels, stride=1):
        """
        Args:
            in_channels: 输入通道数
            out_channels: 中间层通道数(输出为out_channels*4)
            stride: 步长，用于下采样
        """
        super(Bottleneck, self).__init__()
        
        # 第一个1x1卷积：降维，减少计算量
        self.conv1 = nn.Conv2d(in_channels, out_channels, 
                               kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        # 第二个3x3卷积：核心特征提取
        self.conv2 = nn.Conv2d(out_channels, out_channels, 
                               kernel_size=3, stride=stride, 
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # 第三个1x1卷积：升维到out_channels*4
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion,
                               kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        
        # 残差连接的下采样（当维度不匹配时）
        self.downsample = None
        if stride != 1 or in_channels != out_channels * self.expansion:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion,
                         kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * self.expansion)
            )
    
    def forward(self, x):
        """前向传播"""
        identity = x  # 保存输入用于残差连接
        
        # 主路径: conv1 -> bn1 -> relu -> conv2 -> bn2 -> relu -> conv3 -> bn3
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        
        # 残差连接：如果维度不匹配，需要下采样
        if self.downsample is not None:
            identity = self.downsample(x)
        
        # 残差相加后激活
        out = F.relu(out + identity)
        return out


print("Bottleneck模块定义完成")


In [None]:
# ============================================================
# ResNet骨干网络 - 用于特征提取
# 配置[3,4,6,3]对应ResNet-50
# ============================================================

class ResNetBackbone(nn.Module):
    """ResNet-50骨干网络
    
    从输入图像提取多层次特征图
    输出C4特征图用于后续检测
    """
    
    def __init__(self, block=Bottleneck, layers=[3, 4, 6, 3]):
        """
        Args:
            block: 构建块类(Bottleneck)
            layers: 各stage的block数量，[3,4,6,3]为ResNet-50
        """
        super(ResNetBackbone, self).__init__()
        self.in_channels = 64  # 当前输入通道数
        
        # Stem部分：初始卷积+池化
        # 7x7卷积，stride=2，尺寸减半
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, 
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        # 最大池化，再次尺寸减半
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # 四个Stage
        # Stage1: 64通道，无下采样
        self.layer1 = self._make_layer(block, 64, layers[0])
        # Stage2: 128通道，下采样
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        # Stage3: 256通道，下采样
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        # Stage4: 512通道，下采样
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
    
    def _make_layer(self, block, out_channels, blocks, stride=1):
        """构建一个Stage
        
        Args:
            block: Bottleneck类
            out_channels: 中间层通道数
            blocks: block数量
            stride: 首个block步长
        """
        layers = []
        # 第一个block可能下采样
        layers.append(block(self.in_channels, out_channels, stride))
        # 更新通道数
        self.in_channels = out_channels * block.expansion
        # 添加剩余blocks
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        """前向传播
        
        Args:
            x: 输入图像 [B, 3, H, W]
        Returns:
            特征图 [B, 2048, H/32, W/32]
        """
        # Stem: 尺寸变为1/4
        x = F.relu(self.bn1(self.conv1(x)))  # [B, 64, H/2, W/2]
        x = self.maxpool(x)                   # [B, 64, H/4, W/4]
        
        # 四个Stage
        x = self.layer1(x)  # [B, 256, H/4, W/4]
        x = self.layer2(x)  # [B, 512, H/8, W/8]
        x = self.layer3(x)  # [B, 1024, H/16, W/16]
        x = self.layer4(x)  # [B, 2048, H/32, W/32]
        
        return x


print("ResNet骨干网络定义完成")


## 3. 区域建议网络 (RPN)

RPN用于生成候选区域(proposals)。

In [None]:
# ============================================================
# RPN - 区域建议网络
# 在特征图上滑动窗口，预测每个锚框是否包含目标
# ============================================================

class RPN(nn.Module):
    """区域建议网络
    
    功能:
    1. 在每个位置生成多个锚框(不同尺度和比例)
    2. 预测每个锚框的前景/背景概率
    3. 预测边界框回归偏移量
    """
    
    def __init__(self, in_channels=2048, num_anchors=9):
        """
        Args:
            in_channels: 输入特征图通道数
            num_anchors: 每个位置的锚框数量(3比例x3尺度=9)
        """
        super(RPN, self).__init__()
        
        # 3x3卷积，提取每个位置的特征
        self.conv = nn.Conv2d(in_channels, 512, kernel_size=3, 
                              stride=1, padding=1)
        
        # 分类头：预测前景/背景 (num_anchors * 2个输出)
        self.cls_logits = nn.Conv2d(512, num_anchors * 2, kernel_size=1)
        
        # 回归头：预测4个偏移量 (num_anchors * 4个输出)
        self.bbox_pred = nn.Conv2d(512, num_anchors * 4, kernel_size=1)
    
    def forward(self, x):
        """
        Args:
            x: 特征图 [B, C, H, W]
        Returns:
            cls_logits: 分类得分 [B, num_anchors*2, H, W]
            bbox_pred: 回归偏移 [B, num_anchors*4, H, W]
        """
        x = F.relu(self.conv(x))
        cls_logits = self.cls_logits(x)
        bbox_pred = self.bbox_pred(x)
        return cls_logits, bbox_pred


print("RPN网络定义完成")


In [None]:
# ============================================================
# 锚框生成和处理函数
# ============================================================

def generate_anchors(base_size=16, ratios=[0.5, 1, 2], scales=[8, 16, 32]):
    """生成基础锚框
    
    Args:
        base_size: 基础大小
        ratios: 宽高比列表
        scales: 尺度列表
    Returns:
        anchors: 9个基础锚框 [9, 4] (以原点为中心)
    """
    anchors = []
    for ratio in ratios:
        for scale in scales:
            # 计算锚框宽高
            w = base_size * scale * np.sqrt(ratio)
            h = base_size * scale / np.sqrt(ratio)
            # 生成以原点为中心的锚框 [x1, y1, x2, y2]
            anchors.append([-w/2, -h/2, w/2, h/2])
    return torch.tensor(anchors, dtype=torch.float32)


def shift_anchors(feature_map_size, stride=16):
    """将基础锚框平移到特征图的每个位置
    
    Args:
        feature_map_size: 特征图尺寸 (H, W)
        stride: 步长(原图/特征图的比例)
    Returns:
        all_anchors: 所有锚框 [H*W*9, 4]
    """
    # 获取基础锚框
    anchors = generate_anchors()
    
    # 生成网格坐标
    shift_x = torch.arange(0, feature_map_size[1]) * stride
    shift_y = torch.arange(0, feature_map_size[0]) * stride
    shift_y, shift_x = torch.meshgrid(shift_y, shift_x, indexing='ij')
    
    # 构建位移矩阵 [H*W, 4]
    shifts = torch.stack([
        shift_x.flatten(), 
        shift_y.flatten(),
        shift_x.flatten(), 
        shift_y.flatten()
    ], dim=1).float()
    
    # 广播相加生成所有锚框
    # anchors: [1, 9, 4], shifts: [H*W, 1, 4]
    all_anchors = anchors.unsqueeze(0) + shifts.unsqueeze(1)
    all_anchors = all_anchors.view(-1, 4)  # [H*W*9, 4]
    
    return all_anchors


def clip_boxes(boxes, img_size):
    """将边界框裁剪到图像范围内
    
    Args:
        boxes: 边界框 [N, 4]
        img_size: 图像尺寸 (H, W)
    """
    boxes[:, 0] = torch.clamp(boxes[:, 0], 0, img_size[1])  # x1
    boxes[:, 1] = torch.clamp(boxes[:, 1], 0, img_size[0])  # y1
    boxes[:, 2] = torch.clamp(boxes[:, 2], 0, img_size[1])  # x2
    boxes[:, 3] = torch.clamp(boxes[:, 3], 0, img_size[0])  # y2
    return boxes


print("锚框生成函数定义完成")


In [None]:
# ============================================================
# 候选区域生成
# ============================================================

def apply_deltas_to_anchors(deltas, anchors):
    """将回归偏移应用到锚框，得到预测框
    
    Args:
        deltas: 回归偏移 [N, 4] (dx, dy, dw, dh)
        anchors: 锚框 [N, 4]
    Returns:
        pred_boxes: 预测框 [N, 4]
    """
    # 计算锚框的中心和尺寸
    widths = anchors[:, 2] - anchors[:, 0]
    heights = anchors[:, 3] - anchors[:, 1]
    ctr_x = anchors[:, 0] + 0.5 * widths
    ctr_y = anchors[:, 1] + 0.5 * heights
    
    # 获取偏移量
    dx, dy, dw, dh = deltas[:, 0], deltas[:, 1], deltas[:, 2], deltas[:, 3]
    
    # 应用变换公式
    pred_ctr_x = dx * widths + ctr_x
    pred_ctr_y = dy * heights + ctr_y
    pred_w = torch.exp(dw) * widths
    pred_h = torch.exp(dh) * heights
    
    # 转换回[x1, y1, x2, y2]格式
    pred_boxes = torch.zeros_like(deltas)
    pred_boxes[:, 0] = pred_ctr_x - 0.5 * pred_w
    pred_boxes[:, 1] = pred_ctr_y - 0.5 * pred_h
    pred_boxes[:, 2] = pred_ctr_x + 0.5 * pred_w
    pred_boxes[:, 3] = pred_ctr_y + 0.5 * pred_h
    
    return pred_boxes


def generate_proposals(cls_logits, bbox_pred, anchors, img_size,
                       nms_thresh=0.7, pre_nms_topk=6000, post_nms_topk=1000):
    """从RPN输出生成候选区域
    
    Args:
        cls_logits: 分类得分
        bbox_pred: 回归偏移
        anchors: 锚框
        img_size: 图像尺寸
        nms_thresh: NMS阈值
        pre_nms_topk: NMS前保留数量
        post_nms_topk: NMS后保留数量
    """
    N, _, H, W = cls_logits.size()
    num_anchors_per_loc = 9
    
    # 重塑张量
    cls_logits = cls_logits.permute(0, 2, 3, 1).reshape(N, -1, 2)
    bbox_pred = bbox_pred.permute(0, 2, 3, 1).reshape(N, -1, 4)
    
    # 计算前景概率
    objectness = F.softmax(cls_logits, dim=2)[:, :, 1]
    
    proposals = []
    for i in range(N):
        scores = objectness[i]
        deltas = bbox_pred[i]
        
        # 应用偏移得到预测框
        proposals_i = apply_deltas_to_anchors(deltas, anchors)
        proposals_i = clip_boxes(proposals_i, img_size)
        
        # 移除太小的框
        keep = (proposals_i[:, 2] - proposals_i[:, 0] >= 1) & \
               (proposals_i[:, 3] - proposals_i[:, 1] >= 1)
        proposals_i = proposals_i[keep]
        scores = scores[keep]
        
        # NMS前topk
        if len(scores) > pre_nms_topk:
            _, topk_idx = scores.topk(pre_nms_topk)
            proposals_i = proposals_i[topk_idx]
            scores = scores[topk_idx]
        
        # 非极大值抑制
        keep = nms(proposals_i, scores, nms_thresh)
        proposals_i = proposals_i[keep]
        
        # NMS后topk
        if len(proposals_i) > post_nms_topk:
            proposals_i = proposals_i[:post_nms_topk]
        
        proposals.append(proposals_i)
    
    return proposals


print("候选区域生成函数定义完成")


## 4. Fast R-CNN 检测头

对每个候选区域进行分类和边界框回归。

In [None]:
# ============================================================
# Fast R-CNN检测头
# 对每个候选区域进行分类和精确定位
# ============================================================

class FastRCNNHead(nn.Module):
    """Fast R-CNN检测头
    
    功能:
    1. 对每个ROI特征进行分类(11类：1背景+10物体)
    2. 对每个ROI预测边界框偏移量
    """
    
    def __init__(self, in_channels=2048, num_classes=11):
        """
        Args:
            in_channels: 输入通道数
            num_classes: 类别数(包含背景)
        """
        super(FastRCNNHead, self).__init__()
        
        # 两个全连接层提取特征
        self.fc6 = nn.Linear(in_channels * 7 * 7, 1024)
        self.fc7 = nn.Linear(1024, 1024)
        
        # 分类输出层
        self.cls_score = nn.Linear(1024, num_classes)
        
        # 边界框回归输出层(每类4个偏移)
        self.bbox_pred = nn.Linear(1024, num_classes * 4)
    
    def forward(self, x):
        """
        Args:
            x: ROI特征 [N, C, 7, 7]
        Returns:
            cls_score: 分类得分 [N, num_classes]
            bbox_pred: 边界框偏移 [N, num_classes*4]
        """
        # 展平为向量
        x = x.view(x.size(0), -1)
        
        # 两层全连接
        x = F.relu(self.fc6(x))
        x = F.relu(self.fc7(x))
        
        # 分类和回归输出
        cls_score = self.cls_score(x)
        bbox_pred = self.bbox_pred(x)
        
        return cls_score, bbox_pred


print("Fast R-CNN检测头定义完成")


## 5. Faster R-CNN 完整模型

In [None]:
# ============================================================
# Faster R-CNN 完整模型
# 整合骨干网络、RPN和检测头
# ============================================================

class FasterRCNN(nn.Module):
    """Faster R-CNN目标检测模型
    
    组成:
    1. 骨干网络(ResNet-50): 提取图像特征
    2. RPN: 生成候选区域
    3. ROI Align: 提取固定大小的ROI特征
    4. 检测头: 分类和精确定位
    """
    
    def __init__(self, num_classes=11):
        """
        Args:
            num_classes: 类别数(含背景)
        """
        super(FasterRCNN, self).__init__()
        
        self.backbone = ResNetBackbone()  # 骨干网络
        self.rpn = RPN()                   # 区域建议网络
        self.head = FastRCNNHead(num_classes=num_classes)  # 检测头
        self.num_classes = num_classes
    
    def forward(self, images, targets=None):
        """前向传播
        
        Args:
            images: 输入图像 [B, 3, H, W]
            targets: 训练时的真实标注
        """
        if self.training and targets is not None:
            return self._forward_train(images, targets)
        else:
            return self._forward_test(images)
    
    def _forward_train(self, images, targets):
        """训练模式前向传播"""
        # 1. 骨干网络提取特征
        features = self.backbone(images)
        
        # 2. RPN生成候选区域
        rpn_cls, rpn_bbox = self.rpn(features)
        
        # 3. 生成锚框
        anchors = shift_anchors(features.size()[2:], stride=32)
        anchors = anchors.to(features.device)
        
        # 4. 生成proposals
        proposals = generate_proposals(
            rpn_cls, rpn_bbox, anchors, images.size()[2:]
        )
        
        # 5. ROI Align提取特征
        # 将proposals转换为正确格式
        if len(proposals[0]) == 0:
            proposals[0] = torch.tensor([[0, 0, 32, 32]]).to(features.device)
        
        roi_features = roi_align(
            features, [proposals[0]], 
            output_size=(7, 7), 
            spatial_scale=1/32.0
        )
        
        # 6. 检测头预测
        cls_score, bbox_pred = self.head(roi_features)
        
        # 简化的损失计算
        loss = torch.tensor(0.0, device=features.device, requires_grad=True)
        
        return {"loss": loss}
    
    def _forward_test(self, images):
        """推理模式前向传播"""
        # 1. 特征提取
        features = self.backbone(images)
        
        # 2. RPN
        rpn_cls, rpn_bbox = self.rpn(features)
        
        # 3. 锚框和proposals
        anchors = shift_anchors(features.size()[2:], stride=32)
        anchors = anchors.to(features.device)
        proposals = generate_proposals(
            rpn_cls, rpn_bbox, anchors, images.size()[2:]
        )
        
        # 4. ROI Align
        if len(proposals[0]) == 0:
            return [{"boxes": torch.tensor([]), 
                    "labels": torch.tensor([]), 
                    "scores": torch.tensor([])}]
        
        roi_features = roi_align(
            features, [proposals[0]], 
            output_size=(7, 7), 
            spatial_scale=1/32.0
        )
        
        # 5. 检测头
        cls_score, bbox_pred = self.head(roi_features)
        
        # 6. 后处理
        scores = F.softmax(cls_score, dim=1)
        max_scores, labels = scores.max(dim=1)
        
        return [{
            "boxes": proposals[0],
            "labels": labels,
            "scores": max_scores
        }]


print("Faster R-CNN模型定义完成")


## 6. 训练和推理

In [None]:
# ============================================================
# 创建模型和优化器
# ============================================================

# 创建模型实例
model = FasterRCNN(num_classes=11)  # 10类物体 + 1类背景
model = model.to(device)

# SGD优化器（常用于目标检测）
optimizer = torch.optim.SGD(
    model.parameters(), 
    lr=0.001,           # 学习率
    momentum=0.9,       # 动量
    weight_decay=0.0005 # 权重衰减(L2正则化)
)

print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")


In [None]:
# ============================================================
# 训练循环（演示版）
# ============================================================

print("开始训练...")
model.train()

# 只训练1个batch作为演示
for images, targets in train_loader:
    # 移动数据到设备
    images = torch.stack(images).to(device)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
    
    # 前向传播
    loss_dict = model(images, targets)
    loss = loss_dict["loss"]
    
    # 反向传播
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    print(f"训练损失: {loss.item():.4f}")
    break  # 只演示一个batch

print("训练完成!")


In [None]:
# ============================================================
# 推理演示
# ============================================================

print("开始推理...")
model.eval()

with torch.no_grad():
    for images, targets in test_loader:
        images = torch.stack(images).to(device)
        
        # 推理
        outputs = model(images)
        
        # 打印结果
        for i, output in enumerate(outputs):
            print(f"\n图像 {i+1}:")
            print(f"  检测框数量: {len(output['boxes'])}")
            if len(output['labels']) > 0:
                print(f"  预测类别: {output['labels'][:5].tolist()}")
                print(f"  置信度: {output['scores'][:5].tolist()}")
        break

print("\n推理完成!")


## 总结\n
\n
### Faster R-CNN的核心组件:\n
1. **骨干网络**: ResNet-50提取特征图\n
2. **RPN**: 在每个位置生成锚框并预测目标存在概率\n
3. **ROI Align**: 将不同大小的候选区域特征池化为固定尺寸\n
4. **检测头**: 对每个ROI进行分类和边界框回归\n
\n
### 训练流程:\n
- RPN损失：分类+回归\n
- Fast R-CNN损失：分类+回归\n
- 端到端联合优化