# 改进的YOLO v1

backbone: ResNet_18

neck: SPP

head

In [7]:
# backbone: ResNet_18
import torch
import torch.nn as nn

class BasicBlock(nn.Module):

    def __init__(self, conv1_in, conv1_out, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        
        self.conv1 = nn.Conv2d(conv1_in, conv1_out, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(conv1_out)
        self.relu = nn.ReLU(inplace=True)
        
        self.conv2 = nn.Conv2d(conv1_out, conv1_out, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(conv1_out)
        
        # downsample是增加x通道数用的，因为下面一层的F(x)可能会增加通道数，前面一层无法直接相加
        self.downsample = downsample

    def forward(self, x):
        identity = x # 上一层的信息

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out) # conv-1有ReLU

        out = self.conv2(out)
        out = self.bn2(out) # 注意到这里conv-2没有ReLU

        if self.downsample is not None:
            identity = self.downsample(x) # 没记错的李沐课里说的是用1x1卷积来增加通道数

        out += identity
        out = self.relu(out) # 注意到，联合上一层信息之后在进行ReLU
        return out


class Backbone(nn.Module):
    def __init__(self):
        super(Backbone, self).__init__()

        # ResNet's Head
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNet Block-1
        self.layer1 = nn.Sequential(BasicBlock(64, 64), 
                                    BasicBlock(64, 64))

        # ResNet Block-2 
        downsample = nn.Sequential(nn.Conv2d(64, 128, kernel_size=1, stride=2, bias=False),
                                   nn.BatchNorm2d(128))

        self.layer2 = nn.Sequential(BasicBlock(64, 128, 2, downsample=downsample), 
                                    BasicBlock(128, 128))

        # ResNet Block-3
        downsample = nn.Sequential(nn.Conv2d(128, 256, kernel_size=1, stride=2, bias=False),
                                   nn.BatchNorm2d(256))
        self.layer3 = nn.Sequential(BasicBlock(128, 256, 2, downsample=downsample), 
                                    BasicBlock(256, 256))

        # ResNet Block-4
        downsample = nn.Sequential(nn.Conv2d(256, 512, kernel_size=1, stride=2, bias=False),
                                   nn.BatchNorm2d(512))
        self.layer4 = nn.Sequential(BasicBlock(256, 512, 2, downsample=downsample), 
                                    BasicBlock(512, 512))
        

    def forward(self, x):
        C_1 = self.conv1(x)
        C_1 = self.bn1(C_1)
        C_1 = self.relu(C_1)
        C_1 = self.maxpool(C_1)

        C_2 = self.layer1(C_1)
        C_3 = self.layer2(C_2)
        C_4 = self.layer3(C_3)
        C_5 = self.layer4(C_4)

        return C_5



# model = Backbone()
# #model.load_state_dict(torch.load('resnet18.pth'), strict=False)
# # print(model) # 打印模型的网络结构
# pre = torch.load('resnet18.pth') # 下载地址：https://download.pytorch.org/models/resnet18-5c106cde.pth
# pre = [k for k, v in pre.items()]
# # print(pre)
# model.state_dict().keys()
# model.load_state_dict(torch.load('resnet18.pth'), strict=False)

In [8]:
# neck: SPP
import torch
import torch.nn as nn

class Neck(nn.Module):
    def __init__(self):
        super(Neck, self).__init__()


    def forward(self, x):
        # 注：max_pool并没有可以学习的参数
        # backbone ouput size: 13*13*512
        # output size: (13-5+2*2)/1 + 1 = 13, channel=512
        x_1 = torch.nn.functional.max_pool2d(x, 5, stride=1, padding=2)
        # output size: (13-9+2*4)/1 + 1 = 13, channel=512
        x_2 = torch.nn.functional.max_pool2d(x, 9, stride=1, padding=4)
        # output size: (13-13+2*6)/1 + 1 = 13, channel=512
        x_3 = torch.nn.functional.max_pool2d(x, 13, stride=1, padding=6)
        # output size: 13*13, channel=512+512+512+512=2048
        x = torch.cat([x, x_1, x_2, x_3], dim=1) 

        # SPP -> Detection Head还需要用1*1卷积降维到512个Channel
        spp_to_head = nn.Sequential(nn.Conv2d(2048, 512, 1),
                                    nn.BatchNorm2d(512),
                                    nn.LeakyReLU(0.1, inplace=True))

        return spp_to_head(x) 

In [9]:
# Detection Head
class Head(nn.Module):
    def __init__(self):
        super(Head, self).__init__()
        # SPP output size: 13*13*512
        # size: 13*13, channel: 256
        head1 = nn.Sequential(nn.Conv2d(512, 256, 1, stride=1),
                                    nn.BatchNorm2d(256),
                                    nn.LeakyReLU(0.1, inplace=True))

        # size: (13-3+2*1)/1 + 1 = 13, channel: 512
        head2 = nn.Sequential(nn.Conv2d(256, 512, 3, stride=1, padding=1),
                                    nn.BatchNorm2d(512),
                                    nn.LeakyReLU(0.1, inplace=True))

        # size: 13*13, channel: 256
        head3 = nn.Sequential(nn.Conv2d(512, 256, 1, stride=1),
                                    nn.BatchNorm2d(256),
                                    nn.LeakyReLU(0.1, inplace=True))


        # size: (13-3+2*1)/1 + 1 = 13, channel: 512
        head4 = nn.Sequential(nn.Conv2d(256, 512, 3, stride=1, padding=1),
                                    nn.BatchNorm2d(512),
                                    nn.LeakyReLU(0.1, inplace=True))

        pred = nn.Conv2d(512, 1 + 20 + 4, 1) # output [1, 25, 13, 13]

        self.head = nn.Sequential(head1, head2, head3, head4, pred)

    def forward(self, x):
        return self.head(x)

In [10]:
# YOLOv1 Model
class YOLOv1(nn.Module):
    def __init__(self):
        super(YOLOv1, self).__init__()

        self.backbone = Backbone()
        self.backbone.load_state_dict(torch.load('resnet18.pth'), strict=False)
        self.neck = Neck()
        self.head = Head()

    def forward(self, x):
        
        out = self.backbone(x)
        out = self.neck(out)
        out = self.head(out) 
        
        return out

model = YOLOv1()
# print(model)

In [11]:
# Load Data
import torch.utils.data as data
import xml.etree.ElementTree as ET # 读取XML文件需要导入的模块（Python 3.X）
import cv2
import Augmentations
import numpy as np
import torch

class Dataset(data.Dataset):
    def __init__(self):
        self.path_train_txt = '/Users/lan/Downloads/VOCdevkit/VOC2007/ImageSets/Main/train.txt'
        self.path_train_images = '/Users/lan/Downloads/VOCdevkit/VOC2007/JPEGImages/%s.jpg'
        self.path_train_annotations = '/Users/lan/Downloads/VOCdevkit/VOC2007/Annotations/%s.xml'
        self.index_images = [] # ['index_1', 'index_2', ...]
        for line in open(self.path_train_txt):
            self.index_images.append(line.strip())
    
    def __len__(self):
        return len(self.index_images)

    def __getitem__(self, index):
        """ return (image, ground_truth) """

        self.image_index = self.index_images[index]
        self.ground_truth_xml = ET.parse(self.path_train_annotations % self.image_index).getroot()
        self.image = cv2.imread(self.path_train_images % self.image_index)
        height, width, channels = self.image.shape

        # xml标注文件中每个Object都有其class和[xmin, ymin, xmax, ymax]数据，我们需要读取出来
        VOC_CLASSES_NAME = ('aeroplane', 'bicycle', 'bird', 'boat',
                            'bottle', 'bus', 'car', 'cat', 'chair',
                            'cow', 'diningtable', 'dog', 'horse',
                            'motorbike', 'person', 'pottedplant',
                            'sheep', 'sofa', 'train', 'tvmonitor')
        self.class_name_to_index = dict(zip(VOC_CLASSES_NAME, range(len(VOC_CLASSES_NAME))))
        self.xmin, self.ymin, self.xmax, self.ymax, self.index_class_name = 0, 0, 0, 0, 0
        self.ground_truth = []

        # 从xml标注文件中读取[[xmin, ymin, xmax, ymax, index_class_name], ... ]
        for item in self.ground_truth_xml.iter('object'):
            
            class_name = item.find('name').text.lower().strip()
            self.index_class_name = self.class_name_to_index[class_name]

            bounding_box = item.find('bndbox')
            # 因为图像会进行归一化，所以矩形框也要跟着变
            # -1是因为xml文件中的坐标是从1开始的，而图像坐标是从左上角的0开始
            self.xmin = (int(bounding_box.find('xmin').text) - 1) / width # 要不要float我还不确定
            self.ymin = (int(bounding_box.find('ymin').text) - 1) / height
            self.xmax = (int(bounding_box.find('xmax').text) - 1) / width
            self.ymax = (int(bounding_box.find('ymax').text) - 1) / height

            # [[xmin, ymin, xmax, ymax, index_class_name], ... ]
            self.ground_truth.append([self.xmin, self.ymin, self.xmax, self.ymax, self.index_class_name])
        
        # 对图像做数据增强
        if len(self.ground_truth) == 0:
            self.ground_truth = np.zeros([1, 5]) # 确定是否有目标
        else:
            self.ground_truth = np.array(self.ground_truth) # 有的话转成np.array数组
        
        transform = Augmentations.SSDAugmentation(416) # 训练数据的大小416*416，可选择输入640
        self.image, self.bounding_box, self.index_class_name = transform(self.image, 
                                                                         self.ground_truth[:, :4],
                                                                         self.ground_truth[:, 4])
        
        # cv.imread-> BGR -> RGB，与img[:,:,::-1]是等价的
        self.image = self.image[:, :, (2, 1, 0)]                                                                            
        
        # bounding_box: (1,4), index_class_name:(1,) 
        # np.expand_dims(index_class_name, axis=1):(1,1)
        # np.hstack后得到(1,5)
        self.ground_truth = np.hstack((self.bounding_box, np.expand_dims(self.index_class_name, axis=1)))
        
        # permute(2, 0, 1)把通道数放到前面去(H,W,C)->(C,H,W)，相当于img = img.transpose(2, 0, 1)
        self.image = torch.from_numpy(self.image).permute(2, 0, 1)

        

        return self.image, self.ground_truth, height, width 
        # 一次返回一张图片的数据和标签
        # 注意此时self.ground_truth是一个np.array对象，标签也变成了float
        # 举例验证的代码
        # data = Dataset()
        # a = data.__getitem__(1)
        # data.ground_truth
        # array([[ 0.43127962,  0.20664207,  0.65402844,  0.71217712, 14.        ],
        #        [ 0.13744076,  0.26568266,  0.87914692,  1.        , 12.        ]])


def collate_fn(data_batch): 
    # 这个data_batch是什么呢？[dataset[0],dataset[1],...,dataset[batch_size-1]]
    # dataset[0]其实就是调用了__getitem__()方法取出一个img和一个target，组成的一个tuple
    # sample[0]对应img，sample[1]对应target或者说label，ground truth
    ground_truth = []
    images = []
    for item in data_batch:
        images.append(item[0])
        ground_truth.append(torch.FloatTensor(item[1]))

    # 把数据
    batch_size = len(ground_truth)
    ground_truth = [label.tolist() for label in ground_truth]
    
    # 这个正样本的形状跟模型输出不一样，在loss计算时进行处理
    ground_truth_tmp = np.zeros([batch_size, 13, 13, 1+1+4+1])

    for batch_index in range(batch_size):
        for bbox in ground_truth[batch_index]:
            class_index = int(bbox[-1])
            xmin, ymin, xmax, ymax = bbox[:-1]

            # 计算bbox的中心点
            c_x = (xmax + xmin) / 2 * 416
            c_y = (ymax + ymin) / 2 * 416
            box_w = (xmax - xmin) * 416
            box_h = (ymax - ymin) * 416
            
            if box_w < 1e-4 or box_h < 1e-4:
                break    # print('Not a valid data !!!')

            # 计算中心点所在的网格坐标
            c_x_s = c_x / 32
            c_y_s = c_y / 32
            grid_x = int(c_x_s)
            grid_y = int(c_y_s)

            # 计算中心点偏移量和宽高的标签
            tx = c_x_s - grid_x
            ty = c_y_s - grid_y
            tw = np.log(box_w)
            th = np.log(box_h)

            # 计算边界框位置参数的损失权重
            weight = 2.0 - (box_w / 416) * (box_h / 416)

            if grid_x < 13 and grid_y < 13:
                    ground_truth_tmp[batch_index, grid_y, grid_x, 0] = 1.0 # 有边界框的才会被置为1.0哦，后知后觉～_～
                    ground_truth_tmp[batch_index, grid_y, grid_x, 1] = class_index
                    ground_truth_tmp[batch_index, grid_y, grid_x, 2:6] = np.array([tx, ty, tw, th])
                    ground_truth_tmp[batch_index, grid_y, grid_x, 6] = weight

    # 注意啊，这个时候13*13的矩形就被拉成了长度为169的向量了，这是为了后面方便计算loss
    ground_truth_tmp = ground_truth_tmp.reshape(batch_size, -1, 1+1+4+1) 
    
    # torch.stack(images, 0)就是实现(batch_size, H, W)
    return torch.stack(images, 0), torch.from_numpy(ground_truth_tmp).float()



In [12]:
# 测试dataloader能否正常使用
data = Dataset()
dataloader = torch.utils.data.DataLoader(data,
                                         batch_size=1, 
                                         shuffle=True, 
                                         collate_fn=collate_fn,
                                         #num_workers=2, # 该参数去掉能正常使用
                                         pin_memory=True)
i=0
for iter_i, (images, ground_truth) in enumerate(dataloader):
    print(images[0].shape)
    print(ground_truth.shape)
    print(ground_truth[0])
    for label in ground_truth:
        label.tolist()
        print(torch.tensor([1,2]).tolist())

    if i==0:
        break 

torch.Size([3, 416, 416])
torch.Size([1, 169, 7])
tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])
[1, 2]


  mode = random.choice(self.sample_options)


In [13]:
# 测试模型的输出是否正确
data = Dataset()
a = data.__getitem__(1)
data.ground_truth
img = a[0]
img = torch.unsqueeze(img, dim=0)
img.shape
model(img).shape
img = model.backbone(img)
print(img.shape)
img = model.neck(img)
print(img.shape)
img = model.head(img)
print(img.shape)


torch.Size([1, 512, 13, 13])
torch.Size([1, 512, 13, 13])
torch.Size([1, 25, 13, 13])


In [31]:
# loss
import torch.nn as nn

def loss(model_output, ground_truth):
    batch_size = model_output.size(0)

    # [B, C, H, W] -> [B, C, H*W] -> [B, H*W, C]
    model_output = model_output.view(batch_size, 1 + 20 + 4, -1).permute(0, 2, 1)

    # 从model_output分离出objectness的置信度预测、类别class预测、bbox的txtytwth预测  
    pred_conf = model_output[:, :, 0]
    pred_class = model_output[:, :, 1 : (1 + 20)]
    pred_class = pred_class.permute(0, 2, 1)
    pred_txtytwth = model_output[:, :, (1 + 20):]
    pred_txty = pred_txtytwth[:, :, :2]
    pred_twth = pred_txtytwth[:, :, 2:]
    

    # 从ground_truth分离出objectness的置信度、类别class、bbox的txtytwth，以及平衡不同大小的边界框的权重weight 
    conf = ground_truth[:, :, 0]
    class_index = ground_truth[:, :, 1].long() # .long(): convet to int
    txty = ground_truth[:, :, 2:4]
    twth = ground_truth[:, :, 4:6]
    bbox_weight = ground_truth[:, :, 6]

    # 计算pred_conf与conf之间的MSE_loss，因为需要设置正样本和负样本之间的权重，所以不能调包
    # 为啥要clamp我表示不理解
    pred_conf_clamp = torch.clamp(torch.sigmoid(pred_conf), min=1e-4, max=1.0 - 1e-4)
    positive = (conf==1.0).float()
    negtive = (conf==0.0).float()
    positive_loss = positive * (pred_conf_clamp - conf) ** 2
    negtive_loss = negtive * (pred_conf_clamp) ** 2
    conf_loss = 5.0 * positive_loss + 1.0 * negtive_loss
    conf_loss = torch.sum(conf_loss) / batch_size # reduction='mean'

    
    class_loss_function = nn.CrossEntropyLoss(reduction='none')
    class_loss = class_loss_function(pred_class, class_index)
    class_loss = torch.sum(class_loss * conf) / batch_size

    txty_loss_function = nn.BCEWithLogitsLoss(reduction='none')
    txty_loss = txty_loss_function(pred_txty, txty)
    txty_loss = torch.sum(txty_loss, dim=-1) # tx_loss + ty_loss
    txty_loss = torch.sum(txty_loss * conf * bbox_weight) # batch个样本的loss
    txty_loss = txty_loss / batch_size # batch个样本的mean loss

    twth_loss_function = nn.MSELoss(reduction='none')
    twth_loss = twth_loss_function(pred_twth, twth)
    twth_loss = torch.sum(twth_loss, dim=-1) # tw_loss + th_loss
    twth_loss = torch.sum(twth_loss * conf * bbox_weight) # batch个样本的loss
    twth_loss = twth_loss / batch_size # batch个样本的mean loss

    bbox_loss = txty_loss + twth_loss

    total_loss = conf_loss + class_loss + bbox_loss

    return conf_loss, class_loss, bbox_loss, total_loss

In [43]:
# 模型训练框架
import torch.optim as optim
def train():
    model = YOLOv1()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
    dataloader = torch.utils.data.DataLoader(Dataset(), batch_size=32, shuffle=True, collate_fn=collate_fn)
    
    for epoch in range(1):
        for iter, (images, ground_truth) in enumerate(dataloader):
        
            model_output = model(images)
            conf_loss, class_loss, bbox_loss, total_loss = loss(model_output, ground_truth)
            print('conf_loss:', conf_loss.item(), 'class_loss:', class_loss.item(), 'bbox_loss:', bbox_loss.item(), 'total_loss:', total_loss.item())
            total_loss.backward()
            optimizer.step()
            optimizer.zero_grad()

train()

  mode = random.choice(self.sample_options)


conf_loss: 51.57884979248047 class_loss: 7.090444564819336 bbox_loss: 188.95045471191406 total_loss: 247.6197509765625
conf_loss: 44.375701904296875 class_loss: 8.135169982910156 bbox_loss: 133.64157104492188 total_loss: 186.15243530273438
conf_loss: 37.05461120605469 class_loss: 6.3460493087768555 bbox_loss: 99.28950500488281 total_loss: 142.69017028808594
conf_loss: 31.25122833251953 class_loss: 6.0319414138793945 bbox_loss: 155.6922149658203 total_loss: 192.9753875732422
conf_loss: 27.641010284423828 class_loss: 7.335943222045898 bbox_loss: 256.82501220703125 total_loss: 291.8019714355469
conf_loss: 23.417400360107422 class_loss: 5.3550238609313965 bbox_loss: 186.68858337402344 total_loss: 215.4610137939453
conf_loss: 22.41053009033203 class_loss: 6.73112678527832 bbox_loss: 146.82516479492188 total_loss: 175.96682739257812
conf_loss: 20.7586612701416 class_loss: 7.52053165435791 bbox_loss: 198.9384765625 total_loss: 227.21766662597656
conf_loss: 17.132646560668945 class_loss: 6.240

KeyboardInterrupt: 

In [39]:
model = YOLOv1()
model.backbone.load_state_dict(torch.load('resnet18.pth'), strict=False)
#model.load_state_dict(torch.load('resnet18.pth'), strict=False)

_IncompatibleKeys(missing_keys=['conv1.bias'], unexpected_keys=['fc.weight', 'fc.bias'])

In [None]:
# 模型预测
def predict(image):
    model = YOLOv1()
    model.load_state_dict()
    model_output = model(image)
    

