In [2]:
# 데이터세트 클래스 선언

import os
import torch
from PIL import Image
from pycocotools.coco import COCO
from torch.utils.data import Dataset

class COCODataset(Dataset):
    def __init__(self, root, train, transform=None): # root:경로, train:train 데이터셋 불러오기(False이면 검증용 불러옴)
        super().__init__()
        directory = "train" if train else "val"
        annotations = os.path.join(root, "annotations", f"{directory}_annotations.json") # annotation 디렉터리에 있는 json 파일 경로를 설정

        self.coco = COCO(annotations) # 이미지와 어노테이션 정보를 불러오기 전에 학습에 사용되는 카테고리 정보를 불러옴
        self.iamge_path = os.path.join(root, directory)
        self.transform = transform

        self.categories = self._get_categories()
        self.data = self._load_data() # 이미지와 어노테이션 정보를 불러옴

    # self.coco 인스턴스의 cats 속성에서 카테고리 정보를 불러올 수 있음
    # cats 속성은 딕셔너리 구조를 가지며, 상위 카테고리, 카테고리 ID, 카테고리 정보를 포함한다.
    def _get_categories(self):
        categories = {0: "background"} # 모델 추론 시 카테고리 정보를 확인하기 위해 사용
        for category in self.coco.cats.values():
            categories[category["id"]] = category["name"]
        return categories # {0: 'background', 1: 'cat', 2: 'dog'}
    
    # 각 이미지의 아이디, 박스, 라벨을 갖는 타겟 값과 이미지를 리스트로 반환
    def _load_data(self):
        data = []
        # 어노테이션 JSON 파일의 이미지 정보를 순차적으로 반환, 어노테이션 정보는 이미지 ID와 매핑될 수 있으므로 이미지 ID(_id)를 추출
        for _id in self.coco.imgs:
            file_name = self.coco.loadImgs(_id)[0]["file_name"] # 입력된 이미지 ID를 받아 어노테이션 정보를 반환      
            image_path = os.path.join(self.iamge_path, file_name) # 이미지 ID를 받아 이미지의 경로 저장
            image = Image.open(image_path).convert("RGB") # 이미지를 불러옴, convert("RGB")는 이미지 데이터를 RGB 색상 모드로 변환

            boxes = []
            labels = []
            # getAnnIds : image ID를 input으로 그에 해당하는 annotation ID를 return, loadAnns() : Annotation Id를 input으로 annotation dict 전체(상세정보)를 return
            anns = self.coco.loadAnns(self.coco.getAnnIds(_id)) # json 파일에서 annotations 부분을 불러옴

            """"annotations": [{"segmentation": [[374.46, 310.42, 386.68, ..., 376.35, 310.86]], "area": 2243.7513000000004, "iscrowd": 0, "image_id": 495357, "bbox": [337.02, 244.46, 66.47, 66.75], "category_id": 2, "id": 1727},"""
            # annotation 부분에서 bbox와 id를 받아옴
            for ann in anns:
                x, y, w, h = ann["bbox"] # 왼쪽 상단 모서리(x,y), 너비, 높이
                
                boxes.append([x, y, x + w, y + h]) # 왼쪽 상단 모서리, 오른쪽 하단 모서리 좌표로 변겨
                labels.append(ann["category_id"])

            target = {
            "image_id": torch.LongTensor([_id]),
                "boxes": torch.FloatTensor(boxes),
                "labels": torch.LongTensor(labels)
            }
            data.append([image, target])
        return data

    def __getitem__(self, index):
        image, target = self.data[index]
        if self.transform:
            image = self.transform(image)
        return image, target

    def __len__(self):
        return len(self.data)

In [3]:
# 데이터로더

from torchvision import transforms
from torch.utils.data import DataLoader

# 배치 내 이미지를 튜플의 형태로 반환
# 예를 들어, 배치 크기가 4이고, 각 데이터 포인트가 이미지와 타겟을 포함한다면, batch는 [(image1, target1), (image2, target2), (image3, target3), (image4, target4)] 형태
# zip(*batch)를 실행하면 [(image1, image2, image3, image4), (target1, target2, target3, target4)]와 같이 이미지들과 타겟들이 각각 그룹화
def collator(batch):
    return tuple(zip(*batch))

transform = transforms.Compose(
    [
        transforms.PILToTensor(),
        transforms.ConvertImageDtype(dtype=torch.float)
    ]
)

train_dataset = COCODataset("../datasets/coco", train=True, transform = transform)
test_dataset = COCODataset("../datasets/coco", train=False, transform = transform)

train_dataloader = DataLoader(
    train_dataset, batch_size=4, shuffle=True, drop_last=True, collate_fn=collator
)

test_dataloader = DataLoader(
    test_dataset, batch_size=1, shuffle=True, drop_last=True, collate_fn=collator
)

loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!


In [4]:
# 백본 및 모델 구조 정의

from torchvision import models
from torchvision import ops
from torchvision.models.detection import rpn
from torchvision.models.detection import FasterRCNN

# 백본 모델은 VGG-16 모델을 사용
backbone = models.vgg16(weights="VGG16_Weights.IMAGENET1K_V1").features 
backbone.out_channels = 512

# Faster R-CNN 모델은 2stage
# 영역 제안 네트워크
anchor_generator = rpn.AnchorGenerator( # 객체 위치 후보군을 생성
    sizes=((32, 64, 128, 256, 512),),
    aspect_ratios=((0.5, 1.0, 2.0),)
)
# 관심 영역 풀링 : 영역 제안 네트워크에서 생성한 객체 후보군을 입력으로 받아 후보군 내의 특징 맵 영역을 일정한 크기의 고정된 영역으로 샘플링
roi_pooler = ops.MultiScaleRoIAlign( # RoI align 기능이 포함되어있음, MultiScaleRoIalign 클래스는 다양한 스케일의 특징 맵을 입력으로 받아, 각 관심 영역 후보군을 해당 스케일의 특징맵에 맞게 샘플링해 고정된 크기의 관심 영역 특징 맵을 생성한다. 이렇게 생성된 특징 맵은 분류 계층의 입력으로 사용된다.
    featmap_names=["0"], # VGG의 특징맵은 sequential 형태이다. 따라서 첫 번째 특징 추출 계층을 사용하기 위해 "0"으로 설정
    output_size=(7, 7),
    sampling_ratio=2 # 관심 영역을 샘플링하기 위해 2*2 크기의 그리드를 사용
)

device = "cuda" if torch.cuda.is_available() else "mps"
model = FasterRCNN(
    backbone=backbone,
    num_classes=3,
    rpn_anchor_generator=anchor_generator,
    box_roi_pool=roi_pooler
).to(device)

In [5]:
# 최적화 함수 및 학습률 스케일러
from torch import optim

params = [p for p in model.parameters() if p.requires_grad] # 학습이 가능한 매개변수만 params 변수에 저장
optimizer = optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005) # params 변수에 확률적 경사 하강법 적용
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1) # 지정된 주기마다 학습률을 감소시켜 학습률이 너무 크게 줄어들거나 작아지는 것을 방지

{0: 'background', 1: 'cat', 2: 'dog'}


In [19]:
# Faster R-CNN 미세 조정

# Faster R-CNN 학습 과정이라고 생각하면 됨
for epoch in range(5):
    cost = 0.0
    for idx, (images, targets) in enumerate(train_dataloader):
        images = list(image.to(device) for image in images) # 리스트 간소화로 to(device)
        targets = [{k: v.to(device) for k,v in t.items()} for t in targets] # 딕셔너리 간소화로 to(device)

        loss_dict = model(images, targets) # lossclassifier, lossboxreg, lossobjectness, lossrpnboxreg 총 4개의 손실 값이 나옴
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        cost += losses

    lr_scheduler.step() # 모든 배치를 처리 후에 스케쥴러를 한 단계 업데이트
    cost = cost / len(train_dataloader)
    print(f"Epoch : {epoch+1 : 4d}, Cost : {cost : .3f}")

RuntimeError: [srcBuf length] > 0 INTERNAL ASSERT FAILED at "/Users/runner/work/pytorch/pytorch/pytorch/aten/src/ATen/native/mps/OperationUtils.mm":341, please report a bug to PyTorch. Placeholder tensor is empty!

In [6]:
# 모델 추론 및 시각화

import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
from torchvision.transforms.functional import to_pil_image

def draw_bbox(ax, box, text, color):
    ax.add_path(
        plt.Rectangle(
            xy=(box[0],box[1]),
            width=box[2]-box[0],
            height=box[3]-box[1],
            fill=False,
            edgecolor=color,
            linewidth=2,
        )
    )
    ax.annotate(
        text=text,
        xy=(box[0]-5, box[1]-5),
        color=color,
        weight="bold",
        fontsize=13,
    )

threshold = 0.5
categories = test_dataset.categories # {0: 'background', 1: 'cat', 2: 'dog'}
with torch.no_grad():
    model.eval()
    for images, targets in test_dataloader:
        images = [image.to(device) for image in images]
        outputs = model(images)
        ###outputs = [{'boxes': tensor([[x1, y1, x2, y2], ...]), 'labels': tensor([label1, label2, ...]), 'scores': tensor([score1, score2, ...]) # 각 탐지에 대한 신뢰도 점수}]

        boxes = outputs[0]["boxes"].to("cpu").numpy()
        labels = outputs[0]["labels"].to("cpu").numpy()
        scores = outputs[0]["scores"].to("cpu").numpy()

        boxes = boxes[scores>=threshold].astype(np.int32)
        labels = labels[scores>=threshold]
        scores = scores[scores>=threshold]

        fig = plt.figure(figsize=(8,8))
        ax = fig.add_subplot(1,1,1)
        plt.imshow(to_pil_image(images[0]))

        for box, label, score in zip(boxes, labels, scores):
            draw_bbox(ax, box, f"{categories[label]}-{score:.4f}","red")

        tboxes = targets[0]["boxes"].numpy()
        tlabels = targets[0]["labels"].numpy()
        for box, labels in zip(tboxes, tlabels):
            draw_bbox(ax, box, f"{categories[label]}","blue")

        plt.show()

Precision(정밀도) : TP/(TP+FP), 모델이 참이라고 예측한 것 중 얼마나 맞았는지

Recall(재현율) : TP/(TP+FN), 모델이 실제 참값인 데이터를 얼마나 잘 찾아냈는지


In [None]:
# 모델 평가

import numpy as np
from pycocotools.cocoeval import COCOeval

with torch.no_grad():
    model.eval()
    coco_detections = []
    for images, targets in test_dataloader : 
        images = [img.to(device) for img in images]
        outputs = model(images)

        for i in range(len(targets)):
            image_id = targets[i]["image_id"].data.cpu().numpy().tolist()[0]
            boxes = outputs[i]["boxes"].data.cpu().numpy()
            boxes[:,2] = boxes[:,2]-boxes[:,0]
            boxes[:,3] = boxes[:,3]-boxes[:,1]
            scores = outputs[i]["scores"].data.cpu().numpy()
            labels = outputs[i]["labels"].data.cpu().numpy()

            for instance_id in range(len(boxes)):
                box = boxes[instance_id, :].tolist()
                prediction = np.array(
                    [
                        image_id,
                        box[0],
                        box[1],
                        box[2],
                        box[3],
                        float(scores[instance_id]),
                        int(labels[instance_id])
                    ]
                )
                coco_detections.append(prediction)

    coco_detections = np.asarray(coco_detections)
    coco_gt = test_dataloader.dataset.coco_detections
    coco_dt = coco_gt.loadRes(coco_detections)
    coco_evaluator = COCOeval(coco_gt, coco_dt, iouTypr="bbox")
    coco_evaluator.evaluate()
    coco_evaluator.accumulate()
    coco_evaluator.summarize()
