# 物体検出　RetinaNet
+ 1段階検出器の有名なNetwork
+ CNNを使用する

## インポート

In [None]:
# Standard
import os
import sys
import glob
from collections import deque
import random
import copy
from copy import deepcopy
from typing import Callable, Sequence, Tuple, Union
import json

In [None]:
# 3rd-party
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from tqdm import tqdm

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torch import optim

# torchvision
import torchvision
from torchvision import transforms as T
from torchvision.transforms import functional as TF
from torchvision.utils import draw_bounding_boxes
from torchvision.ops import sigmoid_focal_loss, batched_nms
from torchvision.ops.misc import FrozenBatchNorm2d

# Coco
from pycocotools.cocoeval import COCOeval


## Google Drive マウント

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## IoUの計算方法
+ Intersection over Union

In [None]:
'''
boxes1: 矩形集合, [矩形数, 4 (min_x, min_y, max_x, max_y)]
boxes2: 矩形集合, [矩形数, 4 (min_x, min_y, max_x, max_y)]
'''
def calc_iou(boxes1: torch.Tensor,
             boxes2: torch.Tensor,
             ):
    # boxes1の第一軸をunsqueezeしてブロードキャスト計算可能にする
    # [矩形数, 2 (min_x, min_y) or (max_x, max_y)] -> [矩形数, 1, 2]
    # [矩形数, 1, 2]と[矩形数, 2]の演算が[矩形数, 矩形数, 2]になる

    # 積集合の左上の座標を取得
    intersection_left_top = torch.maximum(
        boxes1[:, :2].unsqueeze(dim=1), # [矩形数, 1, 2(min_x, min_y)]
        boxes2[:, :2], # [矩形数, 2(min_x, min_y)]
    )

    # 積集合の右下の座標を取得
    intersection_right_down = torch.minimum(
        boxes1[:, 2:].unsqueeze(dim=1), # [矩形数, 1, 2(max_x, max_y)])
        boxes2[:, 2:], # [矩形数, 2(max_x, max_y)]
    )

    # 重なる領域の面積を計算
    # 不適の場合, 幅と高さの値が負数になるので, 0でクリッピングする
    intersection_width_height = \
     (intersection_right_down - intersection_left_top).clamp(min=0)

    # [矩形数, 矩形数, 2] -> [矩形数, 矩形数, 1]
    intersection_areas = intersection_width_height.prod(dim=2)

    # それぞれの矩形の面積を計算 [矩形数, 1]
    # boxes1's (max_x - min_x) * (max_y - min_y)
    areas1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1])
    # boxes2's (max_x - min_x) * (max_y - min_y)
    areas2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1])

    # 和集合の面積を計算
    # [矩形数, 1, 1(面積)] +
    # [矩形数, 1(矩形数)] =
    # [矩形数, 矩形数, 1]
    union_areas = areas1.unsqueeze(dim=1) + areas2 - intersection_areas

    ious = intersection_areas / union_areas

    return ious, union_areas

## xmin,ymin,xmax,ymax <-> cx,cy,w,h 変換

In [None]:
'''
boxes: 矩形集合, [矩形数(任意), 4 (xmin,ymin,xmax,ymax)]
'''
def convert_to_xywh(boxes: torch.Tensor):
    wh = boxes[..., 2:] - boxes[..., :2]
    xy = boxes[..., :2] + wh / 2
    boxes = torch.cat((xy, wh), dim=-1)
    return boxes

'''
boxes: 矩形集合, [矩形数(任意), 4 (cx,cy,w,h)]
'''
def convert_to_xyxy(boxes: torch.Tensor):
    xymin = boxes[..., :2] - boxes[..., 2:] / 2
    xymax = boxes[..., 2:] + xymin
    boxes = torch.cat((xymin, xymax), dim=-1)
    return boxes

## データセットの分割関数

In [None]:
'''
データセットを分割するための2つの排反なインデックス集合を生成する関数
dataset: 分割対象のデータセット
ratio: 1つ目のセットに含めるデータ量の割合
random_seed: シード値
'''
def generate_subset(dataset: Dataset,
                    ratio: float,
                    random_seed: int=0):
    # サブセットの大きさ
    size = int(len(dataset) * ratio)

    indices = list(range(len(dataset)))

    # シャッフル
    random.seed(random_seed)
    random.shuffle(indices)

    indices1, indices2 = indices[:size], indices[size:]

    return indices1, indices2

## データセット COCO2014
+ 約33万枚の画像データ
+ インターネットから集められた日常画像
+ 物体検出用のアノテーションは80クラスの外接矩形
+ その他, 作者によるセグメンテーション, 姿勢推定など
+ 2014年時点 学習用8万枚, 検証用4万枚 テスト用4万枚


## val2014.zipをダウンロード
+ 検証用のみ使う
+ 1万枚
+ URL: http://images.cocodataset.org/zips/val2014.zip

### GoogleDriveに保存先を変更

In [None]:
!wget http://images.cocodataset.org/zips/val2014.zip

In [None]:
# !ls -al

+ unarアプリ(zip解凍)をインストール
+ WindowsでZip化したファイルも文字化けしない

In [None]:
# !apt install unar

In [None]:
import shutil
shutil.rmtree('val2014')

謎の解凍バグがあるので使わないことにした. '24/3/31

In [None]:
!unzip val2014.zip

In [None]:
# お試し
from IPython.display import Image, display
ws_dir: str = os.getcwd()
file_path: str = os.path.join(ws_dir, f"val2014/COCO_val2014_000000441814.jpg")
display(Image(file_path))

## アノテーションデータのLoad
+ 人と車の2種類
+ instances_val2014_small.json

In [None]:
# !unar val2014.zip

## 独自のCocoDetection データセットクラス
+ 画像変形時に正解矩形も追従させる柔軟性を持つ

In [None]:
class CocoDetection(torchvision.datasets.CocoDetection):
    '''
    物体検出用COCOデータセット読み込みクラス
    img_directory: 画像ファイルが保存されているディレクトリパス
    anno_file: アノテーションファイル
    transforms: データ拡張と整形を行うクラスインスタンス
    '''
    def __init__(self,
                 img_directory: str,
                 anno_file: str,
                 transform: Callable=None,
                 ):
        super().__init__(img_directory, anno_file)

        self.transform = transform

        # カテゴリIDに欠番があるため、それを埋めてクラスIDを割り当て
        self.classes = []
        # 元々のクラスIDと新しく割り当てたクラスIDを相互に変換するためのマッピング
        self.coco_to_pred = {}
        self.pred_to_coco = {}
        for i, category_id in enumerate(sorted(self.coco.cats.keys())):
            self.classes.append(self.coco.cats[category_id]['name'])
            # category_id: 欠番のある1から始まるCocoラベル
            # i: 0から始まる再割り当てラベル
            self.coco_to_pred[category_id] = i
            self.pred_to_coco[i] = category_id

    '''
    データ取得関数
    idx: サンプルを示すインデックス
    '''
    def __getitem__(self,
                    idx: int,
                    ):
        # print('__getitem__のオーバーロード')

        # imgはPIL.Image
        pil_img, target = super().__getitem__(idx)

        # 親クラスのコンストラクタでself.idsに
        # 画像IDが格納されているのでそれを取得
        img_id = self.ids[idx]

        # 物体の集合を1つの矩形でアノテーションしている物を除外
        # アノテーション(jsonに`iscrowd`がないもの or iscrowd==1のもの)
        target = [obj for obj in target
                  if 'iscrowd' not in obj or obj['iscrowd'] == 0]

        # 学習用に該当画像に映る物体のクラスIDと矩形を取得
        # クラスIDはコンストラクタで新規に割り当てたIDに変換
        classes = torch.tensor(
            [self.coco_to_pred[obj['category_id']] for obj in target],
            dtype=torch.int64)
        boxes = torch.tensor(
            [obj['bbox'] for obj in target],
            dtype=torch.float32)

        # 矩形が0個の時, boxes.shape == [0]となってしまうため,
        # 第一軸に4を追加して軸数と第二軸の次元を合わせる
        if boxes.shape[0] == 0:
            boxes = torch.zeros((0, 4)) # torch.size((0,4))

        width, height = pil_img.size # PIL.Image
        # min_x, min_y, width, height => min_x, min_y, max_x, max_y
        boxes[:, 2:] += boxes[:, :2] # (正解矩形数, 4)

        # 矩形が画像領域内に収まるように値をクリッピング
        boxes[:, ::2] = boxes[:, ::2].clamp(min=0, max=width)
        boxes[:, 1::2] = boxes[:, 1::2].clamp(min=0, max=height)

        # ↑ ここまでで、1枚の画像から正解矩形データを複数枚抜き出したことになる

        # 学習のための正解データを用意
        # クラスIDや矩形など渡すものが多岐にわたるので、辞書で用意
        target = {
            'image_id': torch.tensor(img_id, dtype=torch.int64),
            'classes': classes,
            'boxes': boxes,
            'size': torch.tensor((width, height), dtype=torch.int64),
            'orig_size': torch.tensor((width, height), dtype=torch.int64),
            'orig_img': torch.tensor(np.asarray(pil_img))
        }

        # print('type(target) at __getitem__ of CocoDetection', type(target))

        # データ拡張と整形
        if self.transform is not None:
            pil_img, target = self.transform(pil_img, target)

        return pil_img, target

    '''
    モデルで予測されたクラスIDからCOCOのクラスIDに変換する関数
    label: 予測されたクラスID
    '''
    def to_coco_label(self, label: int):
        return self.pred_to_coco[label]



In [None]:
# お試し
boxes = torch.zeros((0, 4)) # (4,)ではない
print(boxes.size())
print(boxes.shape)
print(boxes.numpy().shape)
print(boxes.numpy())
import numpy as np
vec1 = np.zeros((4,))
print(vec1.shape)
print(vec1)
vec2 = np.zeros((0,4))
print(vec2.shape)
print(vec2)

## データ拡張

### 水平反転

In [None]:
class RandomHorizontalFlip:
    def __init__(self, prob: float=0.5):
        self.prob = prob

    def __call__(self,
                 img: Image,
                 target: dict,
                 ):
        pil_img = img
        if random.random() < self.prob:
            pil_img = TF.hflip(pil_img)

        # 正解矩形をx軸方向に反転
        # 制約式: max_x - min_x = width
        # min_x -> width - max_x
        # max_x -> width - min_x
        width = pil_img.size[0]
        # print("type(target)", type(target))
        # print('target', target)
        target['boxes'][:, [0, 2]] = width - target['boxes'][:, [2, 0]]

        return pil_img, target

お試し

In [None]:
# my_coco_ds = COCODetection(

# )

### クロップ

In [None]:
class RandomSizeCrop:
    '''
    scale: 切り抜き前に対する切り抜き後の画像面積の下限と上限
    ratio: 切り抜き後の画像のアスペクト比の下限と上限
    '''
    def __init__(self,
                 scale: Sequence[float],
                 ratio: Sequence[float],
                 ):
        self.scale = scale
        self.ratio = ratio

    '''
    無作為に画像を切り抜く
    '''
    def __call__(self,
                 pil_img: Image,
                 target: dict,
                 ):
        width, height = pil_img.size

        # 切り抜く領域の左上の座標と幅および高さを取得
        # 切り抜く領域はscaleとratioの下限と上限に従う
        top, left, cropped_height, cropped_width = \
            T.RandomResizedCrop.get_params(pil_img,
                                          self.scale,
                                          self.ratio)
        # 左上の座標と幅および高さで指定した領域を切り抜き
        pil_img = TF.crop(pil_img, top, left, cropped_height, cropped_width)

        # 原点がx=left,y=topになるように矩形座標を平行移動
        # (min_x, min_y, max_x, max_y)
        target['boxes'][:, ::2] -= left # min_x, max_x
        target['boxes'][:, 1::2] -= top # min_y, max_y

        # 矩形の座標が切り抜き後に領域外に出る場合は座標をクリップする
        target['boxes'][:, ::2] = \
            target['boxes'][:, ::2].clamp(min=0) # min_x, max_x
        target['boxes'][:, 1::2] = \
            target['boxes'][:, 1::2].clamp(min=0) # min_y, max_y
        target['boxes'][:, ::2] = \
            target['boxes'][:, ::2].clamp(max=cropped_width) # min_x, max_x
        target['boxes'][:, 1::2] = \
            target['boxes'][:, 1::2].clamp(max=cropped_height) # min_y, max_y

        # 幅と高さが0より大きくなる矩形のみを保持(max_x > min_x & max_y > min_y)
        # (矩形数=画像内のインスタンス数, 1)
        keep = (target['boxes'][:, 2] > target['boxes'][:, 0]) & \
               (target['boxes'][:, 3] > target['boxes'][:, 1]) # マスク
        target['classes'] = target['classes'][keep]
        target['boxes'] = target['boxes'][keep]

        # 切り抜き後の画像の大きさを保持
        target['size'] = torch.tensor([cropped_width, cropped_height], dtype=torch.int64)

        return pil_img, target

お試し

### リサイズ

In [None]:
class RandomResize:
    '''
    無作為に画像をアスペクト比を保持してリサイズするクラス
    min_sizes: 短辺の長さの候補、この中から無作為に長さを抽出
    max_size :  長辺の長さの最大値
    '''
    def __init__(self, min_sizes: Sequence[int], max_size: int):
        self.min_sizes = min_sizes
        self.max_size = max_size


    def _get_target_size(self,
                         min_size: int,
                         max_size: int,
                         target: int,
                         ):
        # アスペクト比を保持して短辺をtargetに合わせる
        max_size = int(max_size * target / min_size)
        min_size = target

        # 長辺がmax_sideを超えている場合
        # アスペクト比を保持して長辺をmax_sizeに合わせる
        # このとき, 短辺は, (self.max_size / max_size)倍する
        # つまり, min_sideはtargetから更に短くなる
        if max_size > self.max_size:
            min_size = int(min_size * self.max_size / max_size)
            max_size = self.max_size

        return min_size, max_size

    def __call__(self,
                 pil_img: Image,
                 target: dict,
                 ):
        # 短編の長さを候補の中から無作為に抽出
        min_size = random.choice(self.min_sizes)

        width, height = pil_img.size

        # リサイズ後の大きさを取得
        # 幅と高さのどちらが短編であるか場合分け
        if width < height:
            resized_width, resized_height = self._get_target_size(
                min_size=width, max_size=height, target=min_size)
        else:
            resized_height, resized_width = self._get_target_size(
                min_size=height, max_size=width, target=min_size)

        # 指定した大きさに画像をリサイズ
        pil_img = TF.resize(pil_img, (resized_height, resized_width))

        # 正解矩形をリサイズ前後のスケールに合わせて変更
        ratio = resized_width / resized_height
        target['boxes'] *= ratio

        # リサイズ後の画像の大きさを保存
        target['size'] = torch.tensor(
            [resized_width, resized_height], dtype=torch.int64
        )

        return pil_img, target

お試し

## 画像整形

In [None]:
class ToTensor:
    # PIL -> Tensor
    def __call__(self,
                 img: Image,
                 target: dict,
                 ):
        img = TF.to_tensor(img)
        return img, target

お試し

## 画像を標準化

In [None]:
class Normalize:
    # mean(r,g,b)
    # std (r,g,b)
    def __init__(self,
                 mean: Sequence[float],
                 std: Sequence[float],
                 ):
        self.mean = mean
        self.std = std

    def __call__(self,
                 img: torch.Tensor,
                 target: dict,
                 ):
        img = TF.normalize(img,
                          mean=self.mean,
                          std=self.std,
                          )
        return img, target

お試し

## ランダム選択

In [None]:
class RandomSelect:
    # transform1: データ拡張1
    # transform2: データ拡張2
    # prob: データ拡張1が適用される確率
    def __init__(self,
                 transform1: Callable,
                 transform2: Callable,
                 prob: float=0.5,
                 ):
        self.transform1 = transform1
        self.transform2 = transform2
        self.prob = prob

    def __call__(self,
                 img: Image,
                 target: dict,
                 ):
        if random.random() < self.prob:
            return self.transform1(img, target)

        return self.transform2(img, target)

## Compose

In [None]:
class Compose:
    def __init__(self,
                 transforms: Sequence[Callable],
                 ):
        self.transforms = transforms

    def __call__(self,
                 img: Image,
                 target: dict,
                 ):
        for transform in self.transforms:
            img, target = transform(img, target) # オリジナル アノテーション矩形も変形が必要

        return img, target

# RetinaNet アーキテクチャ

## Pretrained ResNet Backborn by ImageNet

### FrozenResidualBlock

In [None]:
class FrozenResidualBlock(nn.Module):
    def __init__(self,
                 in_channels: int,
                 out_channels: int,
                 stride: int = 1,
                 ):
        super().__init__()

        # 残差接続
        # conv -> batchnorm -> activation -> conv -> batchnorm
        self.conv1 = nn.Conv2d(in_channels,
                               out_channels,
                               kernel_size=3,
                               stride=stride,
                               padding=1,
                               bias=False,
                               )
        self.bn1 = FrozenBatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels,
                               out_channels,
                               kernel_size=3,
                               padding=1,
                               bias=False,
                               )
        self.bn2 = FrozenBatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        # strideが1より大きいときにスキップ接続と残差接続から得られる
        # 特徴量マップの高さと幅をあわせるために、別途畳み込みを用意
        self.downsample = None
        if stride > 1:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels,
                          out_channels,
                          kernel_size=1,
                          stride=stride,
                          bias=False,
                          ),
                FrozenBatchNorm2d(out_channels)
            )

    def forward(self,
                x: torch.Tensor,
                ):
        # x : (B,C,H,W)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            x = self.downsample(x)

        # 残差接続とスキップコネクションの合流
        out += x

        out = self.relu(out)

        return out

### FrozenResNet18

In [None]:
class FrozenResNet18(nn.Module):
    def __init__(self,
                 ):
        super().__init__()

        # Entrance layer
        self.conv1 = nn.Conv2d(in_channels=3,
                               out_channels=64,
                               kernel_size=7,
                               stride=2,
                               padding=3,
                               bias=False,
                               )
        self.bn1 = FrozenBatchNorm2d(num_features=64)
        self.relu = nn.ReLU(inplace=True)

        self.max_pool = nn.MaxPool2d(kernel_size=3,
                                     stride=2,
                                     padding=1,
                                     )

        # 1層目
        self.layer1 = nn.Sequential(
            FrozenResidualBlock(in_channels=64, out_channels=64),
            FrozenResidualBlock(in_channels=64, out_channels=64),
        )

        # 2層目
        self.layer2 = nn.Sequential(
            FrozenResidualBlock(in_channels=64, out_channels=128, stride=2),
            FrozenResidualBlock(in_channels=128, out_channels=128),
        )

        # 3層目
        self.layer3 = nn.Sequential(
            FrozenResidualBlock(in_channels=128, out_channels=256, stride=2),
            FrozenResidualBlock(in_channels=256, out_channels=256),
        )

        # 4層目
        self.layer4 = nn.Sequential(
            FrozenResidualBlock(in_channels=256, out_channels=512, stride=2),
            FrozenResidualBlock(in_channels=512, out_channels=512),
        )

    def forward(self,
                x: torch.Tensor,
                ):
        # x : (B,C,H,W)

        # Entrance layer
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.max_pool(x)

        x = self.layer1(x)
        c3 = self.layer2(x)
        c4 = self.layer3(c3)
        c5 = self.layer4(c4)

        return c3, c4, c5

## 特徴量ピラミッドネットワーク

In [None]:
class FeaturePyramidNetwork(nn.Module):
    # num_features: 出力特徴量のチャネル数

    def __init__(self,
                 num_features: int=256,
                 ):
        super().__init__()

        # 特徴ピラミッドネットワークから出力される階層レベル
        # バックボーンネットワークの最終層の特徴マップを第５階層とし、
        # 縮小方向に第6, 7層の2つの特徴マップを生成
        # 拡大方向に第3, 4層の2つの特徴マップを生成
        self.levels = (3,4,5,6,7)

        # 縮小方向の特徴抽出
        self.p6 = nn.Conv2d(in_channels=512,
                            out_channels=num_features,
                            kernel_size=3,
                            stride=2, # より大域的な特徴量を抽出
                            padding=1,
                            )

        self.p7_relu = nn.ReLU(inplace=True)
        self.p7 = nn.Conv2d(in_channels=num_features,
                            out_channels=num_features,
                            kernel_size=3,
                            stride=2,
                            padding=1,
                            )

        # 拡大方向の特徴抽出
        self.p5_1 = nn.Conv2d(in_channels=512,
                              out_channels=num_features,
                              stride=1,
                              kernel_size=1,
                              padding=0,
                              ) # (K,S,P)=(1,1,0) -> same
        self.p5_2 = nn.Conv2d(in_channels=num_features,
                              out_channels=num_features,
                              kernel_size=3,
                              stride=1,
                              padding=1,
                              ) # (K,S,P)=(3,1,1) -> half
        self.p4_1 = nn.Conv2d(in_channels=256,
                              out_channels=num_features,
                              kernel_size=1,
                              stride=1,
                              padding=0,
                              ) # (K,S,P)=(1,1,0) -> same
        self.p4_2 = nn.Conv2d(in_channels=256,
                              out_channels=num_features,
                              kernel_size=3,
                              stride=1,
                              padding=1,
                              ) # (K,S,P)=(3,1,1) -> half
        self.p3_1 = nn.Conv2d(in_channels=128,
                              out_channels=num_features,
                              kernel_size=1,
                              stride=1,
                              padding=0,
                              ) # (K,S,P)=(1,1,0) -> same
        self.p3_2 = nn.Conv2d(in_channels=num_features,
                              out_channels=num_features,
                              kernel_size=3,
                              stride=1,
                              padding=1,
                              ) # (K,S,P)=(3,1,1) -> half

    def forward(self,
                c3: torch.Tensor,
                c4: torch.Tensor,
                c5: torch.Tensor,
                ):
        # 縮小方向
        p6 = self.p6(c5)
        p7 = self.p7_relu(p6)
        p7 = self.p7(p7) # (B, C=num_features, H=c5_H/2, W=c5_W/2)

        # 拡大方向
        p5 = self.p5_1(c5)
        p5_up = F.interpolate(p5, scale_factor=2) # 特徴マップの縦横サイズを2倍にする
        p5 = self.p5_2(p5)

        p4 = self.p4_1(c4)
        p4_up = F.interpolate(p4, scale_factor=2)
        p4 = self.p4_2(p4)

        p3 = self.p3_1(c3)
        p3_up = F.interpolate(p3, scale_factor=2)
        p3 = self.p3_2(p3_up)

        return p3, p4, p5, p6, p7

## 検出ヘッドネットワーク

In [None]:
class DetectionHead(nn.Module):
    # 検出ヘッド(分類や矩形の回帰に使用する)
    # num_channels_per_anchor: 1アンカーに必要な出力チャネル数(特徴量数)
    # num_anchors: アンカー数
    # num_features: 入力及び中間特徴量のチャネル数
    def __init__(self,
                 num_channels_per_anchor: int,
                 num_anchors: int=9,
                 num_features: int=256,
                 ):
        super().__init__()

        self.num_anchors = num_anchors

        # 特徴ピラミッドネットワークの特徴マップを分類や回帰専用の
        # 特徴マップに変換するための畳み込みブロック
        self.conv_blocks = nn.ModuleList([
            nn.Sequential(nn.Conv2d(in_channels=num_features,
                                    out_channels=num_features,
                                    kernel_size=3,
                                    stride=1,
                                    padding=1),
                          nn.ReLU(inplace=True)) for _ in range(4)])

        # 検出ヘッドの出力チャネル数を設定する
        # 分類ヘッドに使用する場合, アンカーボックス数 x 物体クラス数
        # 矩形ヘッドに使用する場合, アンカーボックス数 x 4 (cx,cy,w,h)
        self.out_conv = nn.Conv2d(
            in_channels=num_features,
            out_channels=num_anchors * num_channels_per_anchor,
            kernel_size=3,
            stride=1,
            padding=1,
        ) # (K,S,P) = (3,1,1) -> half

    def forward(self,
                x: torch.Tensor, # (B,C,H,W)
                ):
        # convを4回実行
        for i in range(4):
            x = self.conv_blocks[i](x)
        x = self.out_conv(x)

        bs, c, h, w = x.shape

        # 後処理に備えて予測結果を並べ替える
        # permute関数(参照でない)
        # (B,C,H,W) -> (B,H,W,C)
        x = x.permute(0, 2, 3, 1)
        # 第一軸に全画素の予測結果を並べる
        # (B,H,W,C) -> (B, H*W*anchors, classes+4[x,y,w,h])
        x = x.reshape(bs, w * h * self.num_anchors, -1)

        '''
        | clazz + 4 [cx,cy,w,h] |
        | ... |
        | ... |
        ↓ H*W*ancors
        '''


        return x



## アンカーボックス生成器

In [None]:
class AnchorGenerator:
    '''
    検出の基準となるアンカーボックスを生成するクラス
    levels: 入力特徴マップの階層
    '''
    def __init__(self,
                 levels: int,
                 ):
        # 用意するアンカーボックスのアスペクト比(ハイパーパラメータ)
        ratios: torch.tensor = torch.tensor([0.5, 1.0, 2.0])

        # 用意するアンカーボックスの基準となる大きさに
        # 対するスケール(ハイパーパラメータ)
        scales = torch.tensor([2 ** 0, 2 ** (1/3), 2 ** (2/3)])

        # 1つのアスペクト比に対して全スケールのアンカーボックスを
        # 用意するので、アンカーボックスの数は
        # アスペクト比の数 * スケール数
        self.num_anchors = ratios.shape[0] * scales.shape[0]

        # 各階層の特徴マップでの1画素の移動量が入力画像での何画素の
        # 移動になるかを表す数値
        # 2**N のスケールで縮小するので, 1画素の移動量入力画像では2**N倍される
        self.strides = [2 ** level for level in levels]

        self.anchors = []
        for level in levels:
            # 現階層における基準となる正方形のアンカーボックスの1辺の長さ
            # 深い階層のアンカーボックスには大きい物体の
            # 検出を担当させるため, 基準の長さを長く設定
            base_length = 2 ** (level + 2)
            # 0: 2**2 = 4
            # 1: 2**3 = 8
            # 2: 2**4 = 16
            # 3: 2**5 = 32
            # 4: 2**6 = 64
            # 5: 2**7 = 128
            # 6: 2**8 = 256
            # 7: 2**9 = 512

            # アンカーボックスの1辺の長さをスケール
            scaled_lengths = base_length * scales
            # アンカーボックスが正方形の場合の面積を計算
            anchor_areas = scaled_lengths ** 2 # (3,)

            # アスペクト比(ratio=height/ratio)に応じて辺の長さを変更
            # width * height = width * (width * ratio) = area
            # width = (area / ratio) ** 0.5
            # unsqueezeとブロードキャストにより
            # アスペクト比 * スケール数の数のアンカーボックスの幅と高さを生成
            # (3,) * (3,1) = (3, 3)
            # e.g
            # a = [1, 3.5, 6]
            # b = [[0.5], [1], [2]]
            # a*b = [
            #  [ 0.5, 1.75, 3],
            #  [ 1, 3.5, 6],
            #  [ 2, 7, 12]
            # ]
            anchor_widths = (anchor_areas / ratios.unsqueeze(1)) ** 0.5
            anchor_heights = anchor_widths * ratios.unsqueeze(1) # (3,3)

            # (3,3) -> (9,)
            anchor_widths = anchor_widths.flatten()
            anchor_heights = anchor_heights.flatten()

            # アンカーボックスの中心を原点(0,0)としたときの
            # x_min, y_min, x_max, y_maxのオフセット
            anchor_x_mins = - 0.5 * anchor_widths
            anchor_y_mins = - 0.5 * anchor_heights
            anchor_x_maxs = 0.5 * anchor_widths
            anchor_y_maxs = 0.5 * anchor_heights

            level_anchors = torch.stack(
                (anchor_x_mins, anchor_y_mins, anchor_x_maxs, anchor_y_maxs),
                dim=1) # (4, 9)

            self.anchors.append(level_anchors)

    # 関数内で勾配計算をさせないことを明示
    @torch.no_grad()
    def generate(self,
                 feature_sizes: Sequence[torch.Size],
                 ):
        '''
        アンカーボックス生成関数 :  b 入力画像座標上
        feature_sizes: 入力される複数の特徴マップぞれぞれの大きさ
        '''
        anchors = []
        # stride: (L, 1)
        # level_anchors: (L,9,4)
        # feature_size: (L, H_p[i]],W_p[i]) i= 1,...,L
        for stride, level_anchors, feature_size in zip(self.strides, self.anchors, feature_sizes):
            # 現階層の特徴マップの大きさ
            height, width = feature_size

            # 入力画像の画素の移動量を表すstridesを使って
            # 特徴マップの画素の位置 -> 入力画僧の画素の位置に変換
            # (画像の中心位置を計算するために0.5を加算)
            # x_at_in = 2^l * (x + 0.5), y_at_in = 2^l * (y + 0.5)
            xs = (torch.arange(width) + 0.5) * stride # 入力画像上
            ys = (torch.arange(height) + 0.5) * stride # 入力画像上

            # 入力画像座標上のグリッド(x,y)
            grid_x, grid_y = torch.meshgrid(xs, ys, indexing='xy')

            grid_x = grid_x.flatten() # (W_p[i]*H_p[i]),)
            grid_y = grid_y.flatten() # (H_p[i]*W_p[i],)

            # 各画像の中心位置にアンカーボックスの
            # x_min,y_min,x_max,y_maxのオフセットを加算
            anchor_x_mins = (grid_x.unsqueeze(1) + level_anchors[:, 0]).flatten()
            anchor_y_mins = (grid_y.unsqueeze(1) + level_anchors[:, 1]).flatten()
            anchor_x_maxs = (grid_x.unsqueeze(1) + level_anchors[:, 2]).flatten()
            anchor_y_maxs = (grid_y.unsqueeze(1) + level_anchors[:, 3]).flatten()

            # 第1軸を追加してx_min, y_min, x_max, y_maxを連結
            level_anchors = torch.stack(
                (anchor_x_mins, anchor_y_mins, anchor_x_maxs, anchor_y_maxs),
                dim=1
            ) # (H_p[i]*W_p[i]*9, 4)
            anchors.append(level_anchors) # list[(H_p[i]*W_p[i]*9, 4), ...] = (L,(H_p[i]*W_p[i]*9, 4)

        # 全階層のアンカーボックスを連結
        anchors = torch.cat(anchors, dim=0) # (sum(H_p[i]*W_p[i]*9), 4)

        return anchors


## RetinaNet全体

In [None]:
class RetinaNet(nn.Module):
    '''
    RetinaNetモデル(backborn=ResNet18)
    num_classes: 物体クラス数
    '''
    def __init__(self,
                 num_classes: int,
                 ):
        super().__init__()

        # バックボーン(基礎特徴抽出)
        self.backbone = FrozenResNet18()

        # 特徴量の精錬
        self.fpn = FeaturePyramidNetwork()

        # アンカーボックス生成器
        self.anchor_generator = AnchorGenerator(self.fpn.levels)

        # 分類及び矩形ヘッド
        #　検出ヘッドはすべての特徴マップで共有
        self.class_head = DetectionHead(
            num_channels_per_anchor=num_classes,
            num_anchors=self.anchor_generator.num_anchors,
        )

        # num_channels_per_anchor=4は
        # (x_diff, y_diff, w_diff, h_diff)を推論するため
        self.box_head = DetectionHead(
            num_channels_per_anchor=4,
            num_anchors=self.anchor_generator.num_anchors,
        )

        self._reset_parameters()

    '''
    パラメータの初期化
    '''
    def _reset_parameters(self):
        for module in self.modules():
            if isinstance(module, nn.Conv2d):
                nn.init.kaiming_normal_(module.weight,
                                        mode='fan_out',
                                        nonlinearity='relu')
        # 分類ヘッドの出力にシグモイドを適用して各クラスの確率を出力
        # 学習開始時の確率が0.01になるようにパラメータを初期化
        prior = torch.tensor(0.01)
        nn.init.zeros_(self.class_head.out_conv.weight)
        nn.init.constant_(self.class_head.out_conv.bias,
                          -((1.0 - prior) / prior).log())

        # 学習開始時のアンカーボックスの中心位置の移動が0,
        # 大きさが1倍となるように矩形ヘッドを初期化
        nn.init.zeros_(self.box_head.out_conv.weight)
        nn.init.zeros_(self.box_head.out_conv.bias)


    '''
    準伝搬
    x: 入力画像 (B, C, H, W)
    '''
    def forward(self, x: torch.Tensor):
        cs = self.backbone(x)
        ps = self.fpn(*cs) # p3,p4,p5,p6,p7

        # 各特徴量マップ(p3,p4,p5,p6,p7)の各画素に対して
        # 9個のアンカーボックスが割り当てられている.

        # p3,p4,p5,p6,p7に対して
        # 分類ヘッドと矩形ヘッドを適用(パラメータ共有)
        class_head_out_list = list(map(self.class_head, ps))
        box_head_out_list = list(map(self.box_head, ps))

        '''各特徴量マップに対するヘッドの結果を連結'''
        # [(B, H_p3*W_p3*anchors, classes), ..., (B, H_p7*W_p7*anchors, classes)]
        preds_class = torch.cat(class_head_out_list, dim=1) # (B, sum(H_p[i]*W_p[i]*anchor)), classes)
        preds_box = torch.cat(box_head_out_list, dim=1) # (B, sum(H_p[i]*W_p[i]*anchor)), 4)

        '''アンカーボックスを生成'''
        feature_sizes = [p.shape[2:] for p in ps] #[(H_p3,W_p3),(H_p4,W_p4),(H_p5,W_p5),(H_p6,W_p6),(H_p7,W_p7)]
        anchors = self.anchor_generator.generate(feature_sizes) # (levels=5, H_p[i]*W_p[i]*9, 4)
        anchors = anchors.to(x.device) # cpu -> cuda

        return preds_class, preds_box, anchors

    '''
    モデルパラメータが保持されているデバイスを返す関数
    '''
    def get_device(self):
        return self.backbone.conv1.weight.device

## 後処理
+ アンカーボックスと予測誤差の統合
+ 余分な検出矩形の除去

アンカーボックスと予測誤差の統合

In [None]:
'''
preds_class : 検出矩形のクラス
              (B, sum(H_p[i]*W_p[i]*anchor))=アンカーボックスの数, classes)
preds_box : 検出矩形のアンカーボックスからの誤差
              (B, sum(H_p[i]*W_p[i]*anchor))=アンカーボックスの数, 4)
anchors : アンカーボックス
              (sum(H_p[i]*W_p[i]*9)=アンカーボックスの数, 4)
targets: ラベル
conf_threshold : 信頼度のしきい値
nms_threshold : NMSのIoUしきい値
'''
@torch.no_grad()
def post_process(preds_class: torch.Tensor,
                 preds_box: torch.Tensor,
                 anchors: torch.Tensor,
                 targets: dict,
                 conf_threshold: float = 0.05,
                 nms_threshold: float = 0.5,
                 ):
    '''矩形の整形'''
    batch_size = preds_class.shape[0]

    anchors_xywh = convert_to_xywh(anchors)

    # 中心座標の予測をスケール不変にするため
    # 予測値をアンカーボックスの大きさでスケールする
    # ネットワークの出力=誤差と見なす.
    # 中心 -> (xr,yr) = (xa+xp*wa, ya+yp*ha)
    # 幅高 -> (wr,hr) = (wa*exp(wp), ha*exp(hp))
    # -> log(wr,hr) = (log(wp)+wp, log(hp)+hp) つまり誤差はlog空間の値
    preds_box[:, :, :2] = anchors_xywh[:, :2] + preds_box[:, :, :2] * anchors_xywh[:, :2] # (left,top)
    preds_box[:, :, 2:] = preds_box[:, :, 2:].exp() * anchors_xywh[:, 2:] # (w,h)

    preds_box = convert_to_xyxy(preds_box)

    '''矩形の除去'''

    # 物体クラスの予測確率をシグモイド関数で計算
    # RetinaNetでは背景クラスは存在せず、
    # 背景を表す場合は、全ての物体クラスの予測確率が低くなるように実装されている
    preds_class = preds_class.sigmoid()

    # 画像ごとの処理
    scores = []
    labels = []
    boxes = []
    for img_preds_class, img_preds_box, img_targets in zip(
        preds_class, preds_box, targets
    ):
        # 検出矩形が画像内に収まるように座標をクリップ
        img_preds_box[:, ::2] = img_preds_box[:, ::2].clamp(
            min=0, max=img_targets['size'][0]) # (xmin,ymin)
        img_preds_box[:, 1::2] = img_preds_box[:, 1::2].clamp(
            min=0, max=img_targets['size'][1]) # (xmax,ymax)

        # 検出矩形は入力画像の大きさに合わせたものになっているので、
        # 元々の画像に合わせて検出矩形をスケールする
        img_preds_box *= img_targets['orig_size'][0] / img_targets['size'][0]

        # 物体クラスのスコアとクラスIDを取得
        img_preds_score, img_preds_label = img_preds_class.max(dim=1)

        # 信頼度がしきい値より高い検出矩形のみを残す
        keep = img_preds_score > conf_threshold
        img_preds_score = img_preds_score[keep]
        img_preds_label = img_preds_label[keep]
        img_preds_box = img_preds_box[keep]

        # クラス毎にNMSを適用
        keep_indices = batched_nms(img_preds_box,
                                   img_preds_score,
                                   img_preds_label,
                                   nms_threshold)

        scores.append(img_preds_score[keep_indices])
        labels.append(img_preds_label[keep_indices])
        boxes.append(img_preds_box[keep_indices])

    return scores, labels, boxes


## 損失関数

In [None]:
'''
preds_class : 検出矩形のクラス
(B, sum(H_p[i]*W_p[i]*anchor))=アンカーボックスの数, classes)

preds_box : 検出矩形のアンカーボックスからの誤差
(B, sum(H_p[i]*W_p[i]*anchor))=アンカーボックスの数, 4)

anchors : アンカーボックス
(sum(H_p[i]*W_p[i]*9)=アンカーボックスの数, 4)

targets: ラベル
list[target(dict), ...]

iou_lower_threshold : 検出矩形と正解矩形をマッチさせるか決める下限値
iou_higher_threshold : 検出矩形と正解矩形をマッチさせるか決める上限値
'''
def loss_func(preds_class: torch.Tensor,
              preds_box: torch.Tensor,
              anchors: torch.Tensor,
              targets: dict,
              iou_lower_threshold: float = 0.4,
              iou_higher_threshold: float = 0.5,
              ):
    anchors_xywh = convert_to_xywh(anchors)

    # 画像ごとに目的関数を計算
    loss_class = preds_class.new_tensor(0)
    loss_box = preds_class.new_tensor(0)
    for img_preds_class, img_preds_box, img_targets in zip(
            preds_class, preds_box, targets):

        # i) 現在の画像に対する正解矩形がないとき
        if img_targets['classes'].shape[0] == 0:
            # 全ての物体クラスの確率が0となるように
            # (背景クラスとして分類されるように)ラベルを作成
            targets_class = torch.zeros_like(img_preds_class)
            # https://pytorch.org/vision/stable/generated/torchvision.ops.sigmoid_focal_loss.html#torchvision.ops.sigmoid_focal_loss
            loss_class += sigmoid_focal_loss(
                img_preds_class, targets_class, reduction='sum')

            continue

        # 各画素のアンカーボックスと正解矩形のIoUを計算し,
        # 各アンカーボックスに対して最大のIoUを持つ正解矩形を抽出
        ious, _ = calc_iou(anchors, img_targets['boxes'])
        # ious: (アンカーボックス数, 正解矩形数)
        ious_max, ious_argmax = ious.max(dim=1) # (anchors, 1), (anchors, 1)

        # 分類ラベルを-1で初期化
        # IoUが下限値と上限値にあるアンカーボックスは
        # ラベルを-1として損失を計算しないようにする
        targets_class = torch.full_like(img_preds_class, -1) # (anchors, classes)

        # ii) IoUが下限値以下は, 背景(確率0)=[0,...,0]とする
        # print("")
        # print('type(ious_max)', type(ious_max))
        # print('ious_max.size()', ious_max.size())
        ious_lower_masks = ious_max < iou_lower_threshold
        # print('ious_lower_masks', ious_lower_masks)
        # print('ious_max < iou_lower_threshold @ size', (ious_max < iou_lower_threshold).size())
        targets_class[ious_lower_masks] = 0
        # print('targets_class', targets_class)

        # iii) IoUが上限値以上は, 陽性のアンカーボックスとして分類回帰の対象にする
        positive_masks = ious_max > iou_higher_threshold # (anchors, 1)
        num_positive_anchors = positive_masks.sum()

        # 陽性のアンカーボックスについて、マッチした正解矩形が示す
        # 物体クラスの確率を1, それ以外を0にする. e.g. [0,0,1,...,0]
        targets_class[positive_masks] = 0 # クラス確率を初期化
        assigned_classes = img_targets['classes'][ious_argmax] # (正解矩形数, classes)
        targets_class[positive_masks,
                      assigned_classes[positive_masks]] = 1 # クラスラベルの列に1を立てる

        # iv) IoUが下限値と上限値の間にある(つまり背景orクラス割り当てが不明瞭)な
        # アンカーボックスについては, 分類の損失計算を行わない
        targets_masks = targets_class != -1
        valid_losses = targets_masks * sigmoid_focal_loss(img_preds_class, targets_class)
        # ここでは, num_positive_anchors == 0のケースがあるので, 0割エラーを回避する.
        # この場合, valid_lossesは全て0の多次元配列なので, valid_losses.sum()もゼロ
        loss_class += valid_losses.sum() / num_positive_anchors.clamp(min=1)

        # 陽性のアンカーボックスが一つも存在しない場合
        # 矩形の誤差計算はしない
        if num_positive_anchors == 0:
            continue

        # 各アンカーボックスとマッチした正解矩形を抽出
        assigned_boxes = img_targets['boxes'][ious_argmax] # (正解矩形数, 4[xmin,ymin,xmax,ymax])
        assigned_boxes_xywh = convert_to_xywh(assigned_boxes)

        ''' アンカーボックスとマッチした正解矩形との誤差計算 '''
        targets_box = torch.zeros_like(img_preds_box) # (anchors, 4)

        # 中心位置の誤差はアンカーボックスの大きさでスケールする
        # (xy_target - xy_anchor) / wh_anchor
        targets_box[:, :2] = \
         (assigned_boxes_xywh[:, :2] - anchors_xywh[:, :2]) / anchors_xywh[:, 2:]

        # 矩形の幅と高さはアンカーボックスに対するスケールのLOG空間で予測
        targets_box[:, 2:] = (assigned_boxes_xywh[:,2:] / anchors_xywh[:, 2:]).log()

        # Smooth L1 誤差 (L1誤差とL2誤差の組み合わせ)を矩形回帰に使用
        loss_box += F.smooth_l1_loss(img_preds_box[positive_masks],
                                     targets_box[positive_masks],
                                     beta=1/9)
    # lossをバッチサイズで割る
    batch_size = preds_class.shape[0]
    loss_class = loss_class / batch_size
    loss_box = loss_box / batch_size

    return loss_class, loss_box




## ハイパーパラメータの設定

In [None]:
class ConfigTrainEval:
    def __init__(self):
        self.img_directory = "/content/val2014"
        self.anno_file = "/content/drive/MyDrive/ColabNotebooks/Book_DL_Image_Recognition/data/coco2014/instances_val2014_small.json"
        self.save_file = "/content/drive/MyDrive/ColabNotebooks/Book_DL_Image_Recognition/model/retinanet.pth"
        self.val_ratio = 0.2
        self.num_epochs = 50
        self.lr_drop = 45
        self.val_interval = 5
        self.lr = 1e-5
        self.clip = 0.1 # 勾配クリップ上限
        self.moving_avg = 100 # 移動平均で計算する損失と正確度の値の数
        self.batch_size = 8
        self.num_workers = 2
        self.device = 'cuda'

## お試し

In [None]:
# お試し

# データ拡張・整形クラスの設定
min_sizes = (480, 512, 544, 576, 608)

train_transforms = Compose((
    RandomHorizontalFlip(),
    RandomSelect(
        RandomResize(min_sizes, max_size=1024),
        Compose((
            RandomSizeCrop(scale=(0.8, 1.0),
                            ratio=(0.75, 1.333)),
            RandomResize(min_sizes, max_size=1024),
        ))
    ),
    ToTensor(),
    # ImageNetの平均と標準偏差
    Normalize(mean=(0.485, 0.456, 0.406),
                std=(0.229, 0.224, 0.225)),
))


naive_trn_coco_ds = CocoDetection(
    img_directory="/content/val2014",
    anno_file="/content/drive/MyDrive/ColabNotebooks/Book_DL_Image_Recognition/data/coco2014/instances_val2014_small.json",
    # transform=train_transforms
)

naive_record = naive_trn_coco_ds[0]
naive_target: dict = naive_record[1]
print(naive_target['orig_img'].size())

transform_trn_coco_ds = CocoDetection(
    img_directory="/content/val2014",
    anno_file="/content/drive/MyDrive/ColabNotebooks/Book_DL_Image_Recognition/data/coco2014/instances_val2014_small.json",
    transform=train_transforms
)

transform_record = transform_trn_coco_ds[0]
transform_target = transform_record[1]
print(transform_target['orig_img'].size())

# 学習と評価

In [None]:
def train_eval():
    config = ConfigTrainEval()

    # データ拡張・整形クラスの設定
    min_sizes = (480, 512, 544, 576, 608)

    train_transforms = Compose((
        RandomHorizontalFlip(),
        RandomSelect(
            RandomResize(min_sizes, max_size=1024),
            Compose((
                RandomSizeCrop(scale=(0.8, 1.0),
                               ratio=(0.75, 1.333)),
                RandomResize(min_sizes, max_size=1024),
            ))
        ),
        ToTensor(),
        # ImageNetの平均と標準偏差
        Normalize(mean=(0.485, 0.456, 0.406),
                    std=(0.229, 0.224, 0.225)),
    ))

    test_transforms = Compose((
        # テストは短編最大で実行
        RandomResize((min_sizes[-1],), max_size=1024),
        ToTensor(),
        # ImageNetの平均と標準偏差
        Normalize(mean=(0.485, 0.456, 0.406),
                    std=(0.229, 0.224, 0.225)),
    ))

    # データセット
    train_dataset = CocoDetection(
        img_directory=config.img_directory,
        anno_file=config.anno_file,
        transform=train_transforms,
    )
    val_dataset = CocoDetection(
        img_directory=config.img_directory,
        anno_file=config.anno_file,
        transform=test_transforms,
    )

    # Subset samplerの生成
    val_set, train_set = generate_subset(train_dataset, config.val_ratio)

    print(f"学習セットのサンプル数: {len(train_set)}")
    print(f"検証セットのサンプル数: {len(val_set)}")

    # 学習時にランダムにサンプルするためのサンプラー
    train_sampler = SubsetRandomSampler(train_set)

    # Dataloader
    train_loader = DataLoader(
        train_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers, sampler=train_sampler,
        collate_fn=collate_func,
    )
    val_loader = DataLoader(
        val_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers, sampler=val_set,
        collate_fn=collate_func,
    )

    # RetinaNet
    model = RetinaNet(len(train_dataset.classes))
    # ResNet18をImageNetの学習済みモデルで初期化
    # 最後の全結合層が無いなどのモデルの改変を許容するためにstrict=False
    model.backbone.load_state_dict(torch.hub.load_state_dict_from_url(
        'https://download.pytorch.org/models/resnet18-5c106cde.pth'
    ), strict=False)

    model.to(config.device)

    optimizer = optim.AdamW(model.parameters(), lr=config.lr)

    scheduler = optim.lr_scheduler.MultiStepLR(
        optimizer, milestones=[config.lr_drop], gamma=0.1
    )

    # Epochループ
    for epoch in range(config.num_epochs):
        model.train()

        with tqdm(train_loader) as pbar:
            pbar.set_description(f"[エポック {epoch + 1}]")

            # 移動平均計算用
            losses_class = deque()
            losses_box = deque()
            losses = deque()

            # 学習
            for imgs, targets in pbar:
                imgs = imgs.to(model.get_device())
                targets = [{
                    k: v.to(model.get_device())
                    for k, v in target.items()
                } for target in targets]

                optimizer.zero_grad()

                preds_class, preds_box, anchors = model(imgs)

                loss_class, loss_box = loss_func(
                    preds_class, preds_box, anchors, targets
                )
                loss = loss_class + loss_box

                loss.backward()

                # 勾配全体のL2ノルムが上限を超えるとき上限値でクリッピング
                torch.nn.utils.clip_grad_norm_(
                    model.parameters(), config.clip
                )

                optimizer.step()

                losses_class.append(loss_class.item())
                losses_box.append(loss_box.item())
                losses.append(loss.item())
                if len(losses) > config.moving_avg:
                    losses_class.popleft()
                    losses_box.popleft()
                    losses.popleft()
                pbar.set_postfix({
                    'loss': torch.Tensor(losses).mean().item(),
                    'loss_class': torch.Tensor(losses_class).mean().item(),
                    'loss_box': torch.Tensor(losses_box).mean().item()
                })

            # スケジューラでエポック数をカウント
            scheduler.step()

            # パラメータを保存
            torch.save(model.state_dict(), config.save_file)

            # 検証
            if (epoch + 1) % config.val_interval == 0:
                evaluate(val_loader, model, loss_func)



## サンプルからミニバッチを生成するcollate関数

In [None]:
'''
batch: CocoDetectionからサンプルした複数枚の画像とラベルをまとめたもの
'''
def collate_func(batch: Sequence[Tuple[Union[torch.Tensor, dict]]]):
    # ミニバッチの中の画像で最大の高さと幅を取得
    max_height = 0
    max_width = 0
    for img, _ in batch:
        height, width = img.shape[1:]
        max_height = max(max_height, height)
        max_width = max(max_width, width)

    # バックボーンネットワークで特徴マップの解像度を下げるときに
    # 切捨てが起きないように入力の幅と高さを32の倍数にしておく
    # もし32の倍数でない場合, バックボーンネットワークの特徴マップと
    # 特徴ピラミッドネットワークのUpScalingでできた特徴マップの
    # 大きさに不整合が生じ, 加算できなくなる
    height = (max_height + 31) // 32 * 32
    width = (max_width + 31) // 32 * 32

    # 画像を一つのテンソルにまとめる
    # ラベルはリストに集約
    imgs = batch[0][0].new_zeros((len(batch), 3, height, width))
    targets = []
    for i, (img, target) in enumerate(batch):
        height, width = img.shape[1:]
        imgs[i, :, :height, :width] = img # パディング領域は0埋め

        targets.append(target)

    return imgs, targets

## 評価関数

In [None]:
def evaluate(data_loader: DataLoader,
             model: nn.Module,
             loss_func: Callable,
             conf_threshold: float = 0.05,
             nms_threshold: float = 0.5):
    model.eval()

    losses_class = []
    losses_box = []
    losses = []
    preds = []
    img_ids = []
    for imgs, targets in tqdm(data_loader, desc='[Validation]'):
        with torch.no_grad():
            imgs = imgs.to(model.get_device())
            targets = [{
                k: v.to(model.get_device())
                for k, v in target.items()
            } for target in targets]

            preds_class, preds_box, anchors = model(imgs)

            loss_class, loss_box = loss_func(
                preds_class, preds_box, anchors, targets
            )

            loss = loss_class + loss_box

            losses_class.append(loss_class)
            losses_box.append(loss_box)
            losses.append(loss)

        # 後処理により最終的な検出矩形を取得
        scores, labels, boxes = post_process(
            preds_class, preds_box, anchors, targets,
            conf_threshold=conf_threshold,
            nms_threshold=nms_threshold,
        )

        for img_scores, img_labels, img_boxes, img_targets in zip(
            scores, labels, boxes, targets
        ):
            img_ids.append(img_targets['image_id'].item())

            # 評価のためにCocoの元々の矩形表現である
            # xmin, ymin, width, heightに変換
            img_boxes[:, 2:] -= img_boxes[:, :2]

            for score, label, box in zip(
                img_scores, img_labels, img_boxes
            ):
                # COCO評価用のデータの保存
                preds.append({
                    'image_id': img_targets['image_id'].item(),
                    'category_id': data_loader.dataset.to_coco_label(label.item()),
                    'score': score.item(),
                    'bbox': box.to('cpu').numpy().tolist()
                })

    # 平均値
    loss_class = torch.stack(losses_class).mean().item()
    loss_box = torch.stack(losses_box).mean().item()
    loss = torch.stack(losses).mean().item()
    print(f"Validation loss = {loss: .3f}, "
          f"class loss = {loss_class: .3f}, "
          f"box loss = {loss_box: .3f} ")

    if len(preds) == 0:
        print('Nothing detected, skip evaluation')
        return

    # COCOevalクラスを使って評価するには検出結果を
    # jsonファイルに出力する必要があるため, jsonファイルに一時保存
    with open('tmp.json', 'w') as f:
        json.dump(preds, f)

    # 一時保存sたい結果をCOCOクラスを使って読み込み
    coco_results = data_loader.dataset.coco.loadRes('tmp.json')

    # COCOevalクラスを使って評価
    coco_eval = COCOeval(
        data_loader.dataset.coco,
        coco_results,
        'bbox'
    )
    coco_eval.params.imgIds = img_ids
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()




## 学習と評価

In [None]:
train_eval()

# デモ

## デモにおけるハイパーパラメータの設定

In [None]:
class ConfigDemo:
    def __init__(self):
        self.img_directory = "/content/drive/MyDrive/ColabNotebooks/Book_DL_Image_Recognition/data/object_detection"
        self.load_file = "/content/drive/MyDrive/ColabNotebooks/Book_DL_Image_Recognition/model/retinanet.pth"
        self.classes = ['person', 'car']
        self.device = 'cuda'
        self.conf_threshold = 0.5
        self.nms_threshold = 0.5

## デモ用の関数

In [None]:
from pathlib import Path
from torchvision.utils import draw_bounding_boxes

def demo():
    config = ConfigDemo()

    transforms = Compose((
        RandomResize((608,), max_size=1024),
        ToTensor(),
        Normalize(mean=(0.485, 0.456, 0.406),
                    std=(0.229, 0.224, 0.225)),

    ))

    # 学習済みモデルパラメータを読み込み
    model = RetinaNet(len(config.classes))
    model.load_state_dict(torch.load(config.load_file))
    model.to(config.device)
    model.eval()

    for img_path in Path(config.img_directory).iterdir():
        img_orig = Image.open(img_path)
        width, height = img_orig.size

        # データを整形を適用するために
        # ダミーデータをラベルの作成
        target = {
            'classes': torch.zeros((0,), dtype=torch.int64),
            'boxes': torch.zeros((0, 4), dtype=torch.float32),
            'size': torch.tensor((width, height), dtype=torch.int64),
            'orig_size': torch.tensor((width, height), dtype=torch.int64),
        }

        # データ整形
        img, target = transforms(img_orig, target)
        imgs, targets = collate_func([(img, target)])

        with torch.no_grad():
            imgs = imgs.to(model.get_device())
            targets = [{ k: v.to(model.get_device())
                            for k, v in target.items()
                        } for target in targets ]

            preds_class, preds_box, anchors = model(imgs)

            # 後処理
            scores, labels, boxes = post_process(
                preds_class, preds_box, anchors, targets,
                conf_threshold = config.conf_threshold,
                nms_threshold = config.nms_threshold
            )

            # 描画用の画像を用意
            img = torch.tensor(np.asarray(img_orig))
            img = img.permute(2, 0, 1)

            # クラスIDをクラス名に変換
            labels = [config.classes[label] for label in labels[0]]

            # 矩形を描画
            img = draw_bounding_boxes(
                img, boxes[0], labels, colors='red',
                font='LiberationSans-Regular.ttf',
                font_size=42, width=4,
            )
            img = img.permute(1,2,0)
            img = img.to('cpu').numpy()
            img = Image.fromarray(img)
            display(img)


## デモの実行

In [None]:

demo()