In [1]:
import torch

#测试数据
targets = [{
    'class_labels': torch.randint(low=0, high=4, size=[5]),
    'boxes': torch.rand(5, 4),
} for _ in range(8)]

outputs = {
    'logits': torch.randn(8, 100, 92),
    'pred_boxes': torch.rand(8, 100, 4)
}

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from scipy.optimize._lsap import linear_sum_assignment


class DetrHungarianMatcher:

    def box_iou(self, boxes1, boxes2):
        inter = []
        union = []
        area = []
        for box1 in boxes1:
            for box2 in boxes2:
                #求交集面积
                x1 = torch.max(box1[0], box2[0])
                y1 = torch.max(box1[1], box2[1])
                x2 = torch.min(box1[2], box2[2])
                y2 = torch.min(box1[3], box2[3])

                w = (x2 - x1).clamp(min=0)
                h = (y2 - y1).clamp(min=0)

                inter.append(w * h)

                #求并集面积
                w1 = box1[2] - box1[0]
                h1 = box1[3] - box1[1]
                w2 = box2[2] - box2[0]
                h2 = box2[3] - box2[1]

                s1 = w1 * h1
                s2 = w2 * h2

                union.append(s1 + s2 - (w * h))

                #求扩展面积
                x1 = torch.min(box1[0], box2[0])
                y1 = torch.min(box1[1], box2[1])
                x2 = torch.max(box1[2], box2[2])
                y2 = torch.max(box1[3], box2[3])

                w = (x2 - x1).clamp(min=0)
                h = (y2 - y1).clamp(min=0)

                area.append(w * h)

        inter = torch.stack(inter).reshape(len(boxes1), len(boxes2))
        union = torch.stack(union).reshape(len(boxes1), len(boxes2))
        area = torch.stack(area).reshape(len(boxes1), len(boxes2))

        #前面的数是iou,值域0-1,衡量了两个框重合的程度,这个数越大越好

        #后面的数是个分数,分别来看分子和分母
        #分子是扩展面积-并集面积,这个数显然是越小越好.
        #分母是扩展面积,显然是起归一化作用,所以这个分数的值域是0-1

        #综合以上,这个数总的来说还是iou,只是额外考虑了扩展面积的情况
        return (inter / union) - (area - union) / area

    #等价写法,上面的写法效率低
    def box_iou(self, boxes1, boxes2):
        area1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1])
        area2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1])

        p1 = torch.max(boxes1[:, :2].unsqueeze(1), boxes2[:, :2])
        p2 = torch.min(boxes1[:, 2:].unsqueeze(1), boxes2[:, 2:])
        wh = (p2 - p1).clamp(min=0)
        inter = wh[:, :, 0] * wh[:, :, 1]

        union = area1.unsqueeze(1) + area2 - inter

        p1 = torch.min(boxes1[:, :2].unsqueeze(1), boxes2[:, :2])
        p2 = torch.max(boxes1[:, 2:].unsqueeze(1), boxes2[:, 2:])
        wh = (p2 - p1).clamp(min=0)
        area = wh[:, :, 0] * wh[:, :, 1]

        return (inter / union) - (area - union) / area

    def xywh_to_x1y1x2y2(self, boxes):
        x = boxes[:, 0]
        y = boxes[:, 1]
        w = boxes[:, 2]
        h = boxes[:, 3]

        x1 = x - 0.5 * w
        y1 = y - 0.5 * h
        x2 = x + 0.5 * w
        y2 = y + 0.5 * h

        return torch.stack([x1, y1, x2, y2], dim=-1)

    @torch.no_grad()
    def __call__(self, outputs, targets):
        #取所有框和预测结果
        #[8, 100, 92] -> [800, 92]
        logits = outputs['logits'].flatten(0, 1).softmax(1)

        #[8, 100, 4] -> [800, 4]
        pred_boxes = outputs['pred_boxes'].flatten(0, 1)

        #取目标
        #[52]
        class_labels = torch.cat([i['class_labels'] for i in targets])

        #[52, 4]
        target_boxes = torch.cat([i['boxes'] for i in targets])

        #label的loss,简单的预测概率取反
        #[800, 92] -> [800, 52]
        class_cost = -logits[:, class_labels]

        #框的loss,4个点距离的和作为loss
        #[800, 4],[52, 4] -> [800, 52]
        bbox_cost = []
        for box1 in pred_boxes:
            cost = [(box1 - box2).abs().sum() for box2 in target_boxes]
            bbox_cost.append(torch.stack(cost))
        bbox_cost = torch.stack(bbox_cost)

        #等价写法
        #bbox_cost = torch.cdist(out_bbox, target_bbox, p=1)

        #[800, 52]
        giou_cost = -self.box_iou(self.xywh_to_x1y1x2y2(pred_boxes),
                                  self.xywh_to_x1y1x2y2(target_boxes))

        #[800, 52] -> [8, 100, 52]
        cost = 5 * bbox_cost + 1 * class_cost + 2 * giou_cost
        cost = cost.view(8, 100, -1).cpu()

        indices = []
        sum_s = 0
        for c, t in zip(cost, targets):
            #取目标框的数量
            s = len(t['boxes'])

            #取这些框的计算结果
            #[100, lens]
            c = c[:, sum_s:sum_s + s]

            #累计索引
            sum_s = sum_s + s

            #c这个矩阵记录的是loss的情况
            #求最小loss的分配方式
            index_row, index_col = linear_sum_assignment(c)
            index_row = torch.LongTensor(index_row)
            index_col = torch.LongTensor(index_col)
            indices.append((index_row, index_col))

        return indices


matcher = DetrHungarianMatcher()

matcher(outputs, targets)

[(tensor([ 7,  8, 35, 92, 97]), tensor([0, 1, 4, 2, 3])),
 (tensor([35, 36, 44, 59, 61]), tensor([2, 1, 0, 4, 3])),
 (tensor([10, 31, 37, 54, 63]), tensor([4, 0, 2, 3, 1])),
 (tensor([ 2, 21, 39, 55, 84]), tensor([3, 2, 1, 4, 0])),
 (tensor([12, 15, 19, 20, 23]), tensor([2, 0, 3, 1, 4])),
 (tensor([ 7, 48, 58, 63, 87]), tensor([0, 4, 2, 1, 3])),
 (tensor([13, 40, 60, 85, 90]), tensor([0, 4, 1, 2, 3])),
 (tensor([15, 23, 35, 68, 83]), tensor([3, 1, 2, 0, 4]))]

In [3]:
class DetrLoss(torch.nn.Module):

    def __init__(self):
        super().__init__()
        empty_weight = torch.ones(92)
        empty_weight[-1] = 0.1
        self.register_buffer('empty_weight', empty_weight)

    def loss_labels(self, outputs, targets, indices):
        # 默认都是背景
        #[8, 100]
        target_classes = torch.full([8, 100],
                                    91,
                                    dtype=torch.int64,
                                    device=outputs['logits'].device)

        #遍历8条数据
        for i in range(8):
            #遍历每一个分配结果(最小cost方式分配)
            for io, it in zip(*indices[i]):
                #按照最小cost的方式分配每个预测结果的目标
                target_classes[i, io.item()] = targets[i]['class_labels'][
                    it.item()]

        #[8, 100, 92] -> [8, 92, 100]
        logits = outputs['logits'].transpose(1, 2)

        return torch.nn.functional.cross_entropy(logits, target_classes,
                                                 self.empty_weight)

    def loss_boxes(self, outputs, targets, indices):
        boxes_output = []
        boxes_target = []
        # 遍历8条数据
        for i in range(8):
            # 遍历每一个分配结果(最小cost方式分配)
            for io, it in zip(*indices[i]):
                # 按照最小cost的方式取每一对框
                boxes_output.append(outputs['pred_boxes'][i, io.item()])
                boxes_target.append(targets[i]['boxes'][it.item()])

        boxes_output = torch.stack(boxes_output)
        boxes_target = torch.stack(boxes_target)

        num_boxes = sum(len(i['class_labels']) for i in targets)
        if num_boxes < 1:
            num_boxes = 1

        #没对框之间求L1距离作为loss
        loss_bbox = torch.nn.functional.l1_loss(boxes_output,
                                                boxes_target,
                                                reduction='none')
        loss_bbox = loss_bbox.sum() / num_boxes

        #取iou作为第二部分的loss
        #只需要考虑成对的框的iou,所以取对角线元素计算即可
        #iou是越大越好,优化方向取反,所以loss=1-iou
        giou = matcher.box_iou(matcher.xywh_to_x1y1x2y2(boxes_output),
                               matcher.xywh_to_x1y1x2y2(boxes_target))
        loss_giou = 1 - giou.diag()
        loss_giou = loss_giou.sum() / num_boxes

        return loss_bbox, loss_giou

    def forward(self, outputs, targets):
        indices = matcher(outputs, targets)

        losses = {}
        losses['loss_ce'] = self.loss_labels(outputs, targets, indices)

        loss_bbox, loss_giou = self.loss_boxes(outputs, targets, indices)
        losses['loss_bbox'] = loss_bbox
        losses['loss_giou'] = loss_giou

        return losses


criterion = DetrLoss()

criterion(outputs, targets)

{'loss_ce': tensor(4.9638),
 'loss_bbox': tensor(0.4050),
 'loss_giou': tensor(0.7186)}

In [4]:
def test():
    from transformers.models.detr.modeling_detr import DetrLoss, DetrHungarianMatcher

    matcher = DetrHungarianMatcher(class_cost=1, bbox_cost=5, giou_cost=2)

    criterion = DetrLoss(matcher=matcher,
                         num_classes=91,
                         eos_coef=0.1,
                         losses=['labels', 'boxes', 'cardinality'])

    return criterion(outputs, targets)


test()

{'loss_ce': tensor(4.9638),
 'loss_bbox': tensor(0.4050),
 'loss_giou': tensor(0.7186),
 'cardinality_error': tensor(94.1250)}