In [None]:
import os
import torch
import torchvision.datasets as dset
from torchvision.transforms import v2
from torchvision.transforms import functional as F
from torch.utils.data import DataLoader

import numpy as np
import json
import torchvision.transforms as T
import torch.utils.data as data

def collate_fn(batch):
    return tuple(zip(*batch))

rootDataset = os.path.join('/', 'home', 'dblab', 'coco')

trainAnnoPath = os.path.join(rootDataset, 'annotations', 'instances_train2017.json')
traincategoryPath = os.path.join(rootDataset, 'annotations', 'categories.json')
trainImagePath = os.path.join(rootDataset, 'train2017')

transforms = v2.Compose([
    v2.Resize((480, 640)),
    v2.ToImage(), 
    v2.ToDtype(torch.float32, scale=True)
])

trainDataset = dset.CocoDetection(root=trainImagePath, annFile=trainAnnoPath)

trainDataLoader = DataLoader(trainDataset, batch_size=4, shuffle=True, collate_fn=collate_fn)



In [None]:
class CoCoDataset(data.Dataset):
    def __init__(self, categoryPath, input_transform=None, labels_path=None):
        self.coco = trainDataset
        with open(categoryPath,'r') as load_category:
            self.category_map = json.load(load_category)
        self.input_transform = input_transform
        self.labels_path = labels_path
	
        self.labels = []
        if self.labels_path:
            self.labels = np.load(self.labels_path).astype(np.float64)
            self.labels = (self.labels > 0).astype(np.float64)
        else:
            l = len(self.coco)
            for i in range(l):
                item = self.coco[i]
                print(i)
                categories = self.getCategoryList(item[1])
                label = self.getLabelVector(categories)
                self.labels.append(label)


    def __getitem__(self, index):
        input = self.coco[index][0]
        if self.input_transform:
            input = self.input_transform(input)
        return input, self.labels[index]


    def getCategoryList(self, item):
        categories = set()
        for t in item:
            categories.add(t['category_id'])
        return list(categories)

    def getLabelVector(self, categories):
        label = np.zeros(80)
        label_num = len(categories)
        for c in categories:
            index = self.category_map[str(c)]-1
            label[index] = 1.0 / label_num
        return label


    def __len__(self):
        return len(self.coco)

In [None]:
from torchvision import models

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = models.resnet50(pretrained=True).to(device)

In [None]:
import torch
import torch.nn as nn
from torch import optim

lr = 0.0001
epochs = 5
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_function = nn.CrossEntropyLoss().to(device)

In [None]:
params = {
    'epochs': epochs,
    'optimizer': optimizer,
    'loss_function': loss_function,
    'trainDataloader': trainDataLoader,
    'device': device
}

In [None]:
def train(model, params):
    loss_function=params["loss_function"]
    trainDataloader=params["trainDataloader"]
    device=params["device"]

    for epoch in range(0, epochs):
        for i, data in enumerate(trainDataloader, 0):
            # train dataloader 로 불러온 데이터에서 이미지와 라벨을 분리
            images, labels = data
            images = list(F.to_tensor(image).to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            # inputs = torch.stack(inputs)
            # inputs = inputs.to(device)
            labels = labels.to(device)

            # 이전 batch에서 계산된 가중치를 초기화
            optimizer.zero_grad() 

            # forward + back propagation 연산
            outputs = model(images)
            train_loss = loss_function(outputs, labels)
            train_loss.backward()
            optimizer.step()
        
        # 학습 결과 출력
        print('Epoch: %d/%d, Train loss: %.6f' % (epoch+1, epochs, train_loss.item()))

In [None]:
train(model, params)

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import CocoDetection
from torchvision.models import resnet50
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torch.optim import SGD
import os

# 데이터셋과 어노테이션 디렉토리 경로
data_dir = os.path.join('/', 'home', 'dblab', 'coco')
train_dir = os.path.join(rootDataset, 'train2017')
val_dir = os.path.join(rootDataset, 'val2017')
anno_path = os.path.join(rootDataset, 'annotations', 'instances_train2017.json')

# 변환 및 전처리
transform = transforms.Compose([
    transforms.ToTensor(),
])

# COCO 데이터셋 불러오기
train_dataset = CocoDetection(root=os.path.join(data_dir, train_dir), annFile=anno_path, transform=transform)

def collate_fn(batch):
    return tuple(zip(*batch))

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

# 모델 및 손실 함수, 옵티마이저 정의
model = fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 91  # COCO dataset의 클래스 수
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
model.train()

optimizer = SGD(model.parameters(), lr=0.005, momentum=0.9)

# 학습
num_epochs = 5

for epoch in range(num_epochs):
    for images, targets in train_loader:
        images = list(image.to(device) for image in images)
        print(targets)
        targets = [{k: v.to(device) for k, v in target.items()} for target in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {losses.item()}')

# 학습이 완료된 모델을 저장
torch.save(model.state_dict(), 'fasterrcnn_resnet50_coco.pth')
