In [1]:
import torch
import torch.nn as nn

In [2]:
class Conv(nn.Module):
    def __init__(self,in_channels,out_channels,kernel_size=1,stride=1,padding=None,group=1):
        super(Conv,self).__init__()
        # 自动计算padding 特征图大小保持不动
        if padding is None:
            padding=(kernel_size-1)//2
        
        self.conv=nn.Conv2d(in_channels,out_channels,kernel_size,stride,padding,group=group,bias=False)
        self.norm=nn.BatchNorm2d(out_channels)
        self.act=nn.SiLU(inplace=True) # SiLU=x*sigmoid(x) 更加平滑，梯度更好
    def forward(self,x):
        return self.act(self.norm(self.conv(x)))


In [3]:
class Bottleneck(nn.Module):
    """
    瓶颈块：1×1 降维 -> 3×3 卷积 -> 残差连接
    与 YOLOv3 的区别：
    - YOLOv3: 1×1 降维 -> 3×3 升维
    - YOLOv8: 类似结构，但用 SiLU 激活
    """
    def __init__(self,in_channels,out_channels,shortcut=True,expansion=0.5):
        super(Bottleneck,self).__init__()

        # 压缩、中间通道数
        hidden_channels=int(out_channels*expansion)

        self.conv1=Conv(in_channels,hidden_channels,1,1) # 1*1 卷积
        self.conv2=Conv(hidden_channels,out_channels,3,1) # 3*3 卷积
        self.add=shortcut and in_channels==out_channels # 是否残差连接

    def forward(self,x):
        if self.add:
            return x+self.conv2(self.conv1(x))
        else:
            return self.conv2(self.conv1(x))

In [4]:
class C2f(nn.Module):
    def __init__(self,in_channels,out_channels,num_bottlenecks=1,shortcut=False,expansion=0.5):
        super(C2f,self).__init__()
        self.hidden_channels=int(out_channels*expansion)

        # 第一次卷积：分流
        self.conv1=Conv(in_channels,2*self.hidden_channels,1,1)
        # 多个 Bottleneck
        self.bottlenecks=nn.ModuleList(
            Bottleneck(self.hidden_channels,self.hidden_channels,shortcut,expansion=1.0)
            for _ in range(num_bottlenecks)
        )
        # 卷积融合
        ## 输入：hidden_channels（直接路径）+ num_bottlenecks * hidden_channels（所有 Bottleneck 输出）
        self.conv2=Conv((2+num_bottlenecks)*self.hidden_channels,out_channels,1,1)
    def forward(self,x):
        """
        x: (b,c,h,w)
        """
        # 卷积，分流
        y=self.conv1(x)
        y1,y2=y.chunk(2,dim=1) # 通道维度分半

        # y1: 直接传递
        # y2: 经过 Bottleneck 链
        y_list=[y1,y2]

        for bottleneck in self.bottlenecks:
            y2=bottleneck(y2)
            y_list.append(y2)

        # concat 所有分支 + 融合
        y=torch.cat(y_list,dim=1)
        return self.conv2(y)

        

In [5]:
class SPPF(nn.Module):
    """
    SPPF : 空间金字塔池化
    - 连续 3 次 5×5 MaxPool（等效于 5×5, 9×9, 13×13 的并行池化）
    - 比原始 SPP 更快，效果相当
    """
    def __init__(self,in_channels,out_channels,kernel_size=5):
        super(SPPF,self).__init__()
        hidden_channels=in_channels//2

        self.conv1=Conv(in_channels,hidden_channels,1,1)
        self.conv2=Conv(hidden_channels*4,out_channels,1,1)
        self.maxpool=nn.MaxPool2d(kernel_size,stride=1,padding=kernel_size//2)
    
    def forward(self,x):
        x=self.conv1(x)

        # 连续池化
        y1=self.maxpool(x)
        y2=self.maxpool(y1)
        y3=self.maxpool(y2)

        # concat x+三次池化后的结果
        return self.conv2(torch.cat([x,y1,y2,y3],dim=1))

In [6]:
import torch.nn.functional as F

In [7]:
class DFL(nn.Module):
    """
    分布式焦点损失
    核心思想：
    - 传统方法：直接回归一个数值（如 x, y, w, h）
    - DFL 方法：用一个分布来表示数值
    - 更好地处理边界的不确定性
    - 让模型对边界框更"自信"
    """
    def __init__(self,num_bins=16):
        """
        num_bins:分布的区间数量
        """
        super(DFL,self).__init__()
        self.num_bins=num_bins
        # 生成区间的权重：[0,1,2......,15]
        self.register_buffer('project',torch.arange(num_bins,dtype=torch.float))
    
    def forward(self,x):
        """
        input: (B,4*num_bins,H,W) - 4 个边界 × num_bins 个区间
        output: (B,4,H,W) - 4 个边界值

        流程：
        1. Reshape 成 (B,4,num_bins,H,W)
        2. Softmax 归一化分布
        3. 加权求和得到最终值
        """
        # (B,4*num_bins,H,W) -> (B,4,num_bins,H,W)
        batch_size, _, height, width = x.shape
        x=x.view(batch_size,4,self.num_bins,height,width)

        # 概率分布：归一化
        x=F.softmax(x,dim=2) # bins进行 softmax

        # 加权求和： 分布 -> 单一值
        x=(x*self.project.view(1,1,self.num_bins,1,1)).sum(dim=2)
        
        return x

    

In [8]:

class DecoupledHead(nn.Module):
    """
    ┌──────────────────────────────────────────────────────────┐
    │ YOLOv8（解耦头）:                                          │
    │                 ┌→ Conv → Conv → [cls]（分类分支）         │
    │   Input → Split─┤                                        │
    │                 └→ Conv → Conv → [x, y, w, h]（定位分支）  │ 
    │   (分类和定位独立，互不干扰)                                 │
    └──────────────────────────────────────────────────────────┘
    
    优势：
    1. 分类和定位任务特性不同，独立学习效果更好
    2. 避免两个任务相互干扰
    3. 更容易优化
    """
    def __init__(self,num_classes=80,in_channels=256,num_bins=16): 
        """
        Args:
            num_classes: 类别数量（COCO = 80）
            in_channels: 输入通道数
            num_bins: DFL 分布区间数
        """
        super(DecoupledHead,self).__init__()
        self.num_classes=num_classes
        self.num_bins=num_bins

        # 分类分支
        self.cls_convs=nn.Sequential(
            Conv(in_channels,in_channels,3,1),
            Conv(in_channels,in_channels,3,1)
        )
        self.cls_pred=nn.Conv2d(in_channels,num_classes,1) # 最终分类预测
        self.reg_convs=nn.Sequential(
            Conv(in_channels,in_channels,3,1),
            Conv(in_channels,in_channels,3,1)
        )
        self.reg_pred=nn.Conv2d(in_channels,4*num_bins,1) # 预测 4 个边界 × num_bins 个区间
        # DFL 模块：分布 → 数值
        self.dfl=DFL(num_bins)
    def forward(self,x):
        """
        input: (B,in_channels,H,W)
        output: 
            cls_output:(B,num_classes,H,W)
            reg_output:(B,4,H,W)
        """
        # 分类分支
        cls_feat=self.cls_convs(x)
        cls_output=self.cls_pred(cls_feat)  # (B,num_classes,H,W)

        # 定位分支
        reg_feat=self.reg_convs(x)
        reg_output=self.reg_pred(reg_feat) # (B,4*num_bins,H,W)

        # DFL: 分布 -> 数值
        reg_output=self.dfl(reg_output) # (B,4,H,W)

        return cls_output, reg_output


In [9]:
class AnchorFreeDecoder(nn.Module):
    """
    ┌──────────────────────────────────────────────────────────┐
    │ YOLOv8（Anchor-free）:                                    │
    │   预测：ltrb（距离中心点的 left, top, right, bottom）        │
    │   解码：                                                  │
    │     x1 = cx - left                                       │
    │     y1 = cy - top                                        │
    │     x2 = cx + right                                      │
    │     y2 = cy + bottom                                     │
    │   无需：预设锚框，更灵活                                     │
    └──────────────────────────────────────────────────────────┘
    """
    def __init__(self,num_bins=16):
        super(AnchorFreeDecoder,self).__init__()
        self.num_bins=num_bins
    
    def forward(self,cls_output,reg_output,stride=32):
        """
        Args:
            cls_output: (B,num_classes,H,W) 分类预测
            reg_output: (B,4,H,W) 定位预测 （ltrb 格式）
            stride: 下采样倍数（32,16,8）
        
        Returns:
            boxes: (B,H*W,4) 边界框 [x1,y1,x2,y2]
            scores: (B,H*W,num_classes) 类别分数
        """
        batch_size, num_classes, height, width =cls_output.shape
        device=cls_output.device

        # 1、中心坐标生成
        # 例如：13×13 网格，stride=32
        # cx: [[0, 1, 2, ..., 12], [0, 1, 2, ..., 12], ...]
        # cy: [[0, 0, 0, ..., 0], [1, 1, 1, ..., 1], ...]
        grid_y,grid_x=torch.meshgrid(
            torch.arange(height,device=device),
            torch.arange(width,device=device),
            indexing='ij'
        )
        # 转换到图像坐标
        grid_x=grid_x.float()*stride+stride/2  # 网格中心 x
        grid_y=grid_y.float()*stride+stride/2  # 网格中心 y
        
        # 2、边界框解码
        # reg_output: (B,4,H,W) -> (B,H,W,4)
        reg_output=reg_output.permute(0,2,3,1)

        # 提取 ltrb （距离中心点的 left,top,right,bottom）
        left=reg_output[...,0]*stride
        top=reg_output[...,1]*stride
        right=reg_output[...,2]*stride
        bottom=reg_output[...,3]*stride
        
        # 边界框坐标计算
        x1=grid_x-left
        y1=grid_y-top
        x2=grid_x+right
        y2=grid_y+bottom

       # 拼接
        boxes =torch.stack([x1,y1,x2,y2],dim=-1) #(B,H,W,4)

        # 分类分数
        # cls_output: (B,num_classes,H,W) -> (B,H,W,num_classes)
        scores=cls_output.permute(0,2,3,1).sigmoid()

        # 4. Flatten 为 (B, H*W, ...)
        boxes = boxes.view(batch_size,-1,4) # (B,H,W,4)->(B,H*W,4)
        scores=scores.view(batch_size,-1,num_classes) #(B,H,W,num_classes)->(B,H*W,num_classes)

        return boxes,scores
        

In [10]:
class YOLOv8Backbone(nn.Module):
    """
    结构：
    - Stage 1: 640 → 320 (stride=2)
    - Stage 2: 320 → 160 (stride=4)
    - Stage 3: 160 → 80 (stride=8) → P3 输出
    - Stage 4: 80 → 40 (stride=16) → P4 输出
    - Stage 5: 40 → 20 (stride=32) → P5 输出 + SPPF
    """
    def __init__(self):
        super(YOLOv8Backbone,self).__init__()

        # Stage 0
        # 640*640*3 ->(in,out,3,2,1)->320*320*64
        self.stem=Conv(3,64,3,2)

        # Stage 1
        # 320*320*64->(in,out,3,2,1)->c2f->160*160*128
        self.stage1=nn.Sequential(
            Conv(64,128,3,2),
            C2f(128,128,num_bottlenecks=3)
        )

        # Stage 2
        # 160*160*128->(in,out,3,2,1)->c2f->80*80*256
        self.stage2=nn.Sequential(
            Conv(128,256,3,2),
            C2f(256,256,num_bottlenecks=6)
        )

        # Stage 3
        # 80*80*256->(in,out,3,2,1)->c2f->40*40*512
        self.stage3=nn.Sequential(
            Conv(256,512,3,2),
            C2f(512,512,num_bottlenecks=6)
        )

        # Stage 4
        # 40*40*512->(in,out,3,2,1)->c2f->(20,20,1024)
        self.stage4=nn.Sequential(
            Conv(512,1024,3,2),
            C2f(1024,1024,num_bottlenecks=3),
            SPPF(1024,1024)  # 增大感受野
        )
    
    def forward(self,x):
        """
        input: (B,3,640,640)
        output:
            p3: (B,256,80,80)  - 浅层特征
            p4: (B,512,40,40)  - 中层特征
            p5: (B,1024,20,20) - 深层特征
        """
        x=self.stem(x) # 640->320
        x=self.stage1(x) # 320->160
        p3=self.stage2(x) # 160->80
        p4=self.stage3(p3) # 80->40
        p5=self.stage4(p4) # 40->20
        return p3,p4,p5

In [11]:
class YOLOv8Neck(nn.Module):
    """
    结构：
    1. Top-down 路径（FPN）：深层 → 浅层，传递语义信息
       P5 (20×20) → Upsample → 与 P4 Concat → N4 (40×40)
       N4 (40×40) → Upsample → 与 P3 Concat → N3 (80×80)
    
    2. Bottom-up 路径（PAN）：浅层 → 深层，传递定位信息
       N3 (80×80) → Downsample → 与 N4 Concat → N4 (40×40)
       N4 (40×40) → Downsample → 与 P5 Concat → N5 (20×20)
    
    """
    def __init__(self):
        super(YOLOv8Neck,self).__init__()

        # 语义传递 Top-down路径 ：浅层语义传递
        # P5->N4
        self.upsample1=nn.Upsample(scale_factor=2,mode='nearest')
        self.c2f_up1=C2f(1024+512,512,num_bottlenecks=3)
        # N4->N3
        self.upsample2=nn.Upsample(scale_factor=2,mode='nearest')
        self.c2f_up2=C2f(512+256,256,num_bottlenecks=3)

        # 定位传递 Bottom-up路径 ： 深层定位传递
        # N3->N4
        self.dowansample1=Conv(256,256,3,2) #下采样
        self.c2f_down1=C2f(256+512,512,num_bottlenecks=3)
        # N4->N5
        self.dowansample2=Conv(512,512,3,2)
        self.c2f_down2=C2f(512+1024,1024,num_bottlenecks=3)

    def forward(self,p3, p4, p5):
        """
        input:
            p3: (B,256,80,80)  Backbone 浅层输出
            p4: (B,512,40,40)  Backbone 中层输出
            p5: (B,1024,20,20)  Backbone 深层输出
        output:
            n3: (B, 256, 80, 80)   融合后的浅层特征
            n4: (B, 512, 40, 40)   融合后的中层特征
            n5: (B, 1024, 20, 20)  融合后的深层特征
        """
        # FPN 
        fpn_p5_up=self.upsample1(p5) #20->40
        fpn_n4_concat=torch.cat([fpn_p5_up,p4],dim=1) # 40
        fpn_n4=self.c2f_up1(fpn_n4_concat) # 40->40 | dim: 1024+512->512

        fpn_n4_up=self.upsample2(fpn_n4) # 40->80
        fpn_n3_concat=torch.cat([fpn_n4_up,p3],dim=1) # 80
        n3=self.c2f_up2(fpn_n3_concat) # 80->80 | dim: 512+256->256

        # PAN
        pan_n3_down=self.dowansample1(n3) # 80->40
        pan_n3_concat=torch.cat([pan_n3_down,fpn_n4],dim=1) # 40
        n4=self.c2f_down1(pan_n3_concat) # 40->40 | dim: 256+512->512

        pan_n4_down=self.dowansample2(n4) # 40->20
        pan_n4_concat=torch.cat([pan_n4_down,p5],dim=1) #20
        n5=self.c2f_down2(pan_n4_concat) # 20->20 | dim: 512+1024->1024

        return n3,n4,n5

    

In [12]:
class YOLOv8(nn.Module):
    def __init__(self,num_classes=80,num_bins=16):
        super(YOLOv8,self).__init__()
        self.num_classes=num_classes
        self.num_bins=num_bins

        # Backbone
        self.backbone=YOLOv8Backbone()
        
        # Neck
        self.neck=YOLOv8Neck()

        # Head (3个尺寸的检测头)
        self.head_large=DecoupledHead(num_classes,in_channels=256,num_bins=num_bins)      # 小目标 特征图：80*80，语义维度： 256
        self.head_medium=DecoupledHead(num_classes,in_channels=512,num_bins=num_bins)     # 中目标 特征图：40*40，语义维度： 512
        self.head_small=DecoupledHead(num_classes,in_channels=1024,num_bins=num_bins)     # 大目标 特征图：20*20，语义维度： 1024

        # Decoder
        self.decoder=AnchorFreeDecoder(num_bins=num_bins)

    def forward(self,x):
        """
        Args:
            x: (B,3,640,640)
            return:
                boxes: (B,H*W,4) - 3 个目标的边界框
                scores: (B,H*W,num_classes) - 3 个目标的分数
        """
        # Backbone: 多尺度特征提取
        p3,p4,p5=self.backbone(x)
        # Neck: 特征融合
        n3,n4,n5=self.neck(p3,p4,p5)

        # Head: 预测
        cls_large,reg_large=self.head_large(n3)    # 80×80 (stride=8)
        cls_medium,reg_medium=self.head_medium(n4)  # 40×40 (stride=16)
        cls_small,reg_small=self.head_small(n5)     # 20×20 (stride=32)

        # Decoder: 解码
        boxes_large,scores_large=self.decoder(cls_large,reg_large,stride=8)
        boxes_medium,scores_medium=self.decoder(cls_medium,reg_medium,stride=16)
        boxes_small,scores_small=self.decoder(cls_small,reg_small,stride=32)

        return {
            'boxes':[boxes_large,boxes_medium,boxes_small],
            'scores':[scores_large,scores_medium,scores_small]
        }
    def predict(self,x,conf_threshold=0.25,iou_threshold=0.45):
        """
        Args:
            x: (B,3,640,640)
            conf_threshold: 置信度阈值
            iou_threshold: NMS 的 IoU 阈值
        
        Returns:
            predictions: List[Dict]，每个样本的检测结果
                {
                    'boxes' : (B,4) [x1,y1,x2,y2]
                    'scores' : (B,)
                    'classes' : (B,)
                }
        """
        outputs=self.forward(x)
        # 预测合并
        boxes=torch.cat(outputs['boxes'],dim=1) # (B,total_anchors,4)
        scores=torch.cat(outputs['scores'],dim=1) # (B,total_anchors,num_classes)
        batch_size=boxes.shape[0]
        predictions=[]

        for i in range(batch_size):
            # 获取最大类别分数和类别 ID
            max_scores,class_ids=scores[i].max(dim=-1)

            # 置信度过滤
            ## 置信度阈值
            keep=max_scores>conf_threshold
            filtered_boxes=boxes[i][keep]
            filtered_scores=max_scores[keep]
            filtered_classes=class_ids[keep]

            # NMS
            if len(filtered_boxes)>0:
                # 使用 torchvision 的 nms 实现
                keep_nms=self._nms(filtered_boxes,filtered_scores,iou_threshold)
                predictions.append({
                    'boxes':filtered_boxes[keep_nms],
                    'scores':filtered_scores[keep_nms],
                    'classes':filtered_classes[keep_nms]
                })
            else:
                predictions.append({
                    'boxes': torch.empty(0, 4),
                    'scores': torch.empty(0),
                    'classes': torch.empty(0, dtype=torch.long)
                })
        
        return predictions

                
        def _nms(self, boxes, scores, iou_threshold):
            """简化的 NMS 实现"""
            from torchvision.ops import nms
            return nms(boxes, scores, iou_threshold)

        
    
     