파일 경로는 사용자에 맞추어 수정해야 합니다.

현재는 구글 드라이브 내부에 datasets, models, submissions 폴더가 있다고 가정합니다.


- datasets: 대회 측에서 제공한 CT 데이터셋이 들어갑니다.

- models: 해당 코드를 통해 훈련된 최대 성능의 모델이 저장됩니다.

- submissions: 대회에 제출하기 위해 작성한 csv 파일을 저장합니다.


In [2]:
!git clone https://github.com/rwightman/pytorch-image-models

Cloning into 'pytorch-image-models'...
remote: Enumerating objects: 11718, done.[K
remote: Counting objects: 100% (627/627), done.[K
remote: Compressing objects: 100% (229/229), done.[K
remote: Total 11718 (delta 439), reused 525 (delta 387), pack-reused 11091[K
Receiving objects: 100% (11718/11718), 20.83 MiB | 27.35 MiB/s, done.
Resolving deltas: 100% (8598/8598), done.


In [3]:
import shutil

shutil.copytree("/content/pytorch-image-models/timm", "/content/timm")

'/content/timm'

In [None]:
import os
import torch
import torch.nn as nn
from torch import optim
from torchvision import utils
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import pandas as pd
from PIL import Image
from timm.models import convnext
import random
import torch.nn.functional as F
import csv
import time
import skimage, torchvision

random_seed=0
# torch random 값 생성
torch.manual_seed(random_seed)
# 현 gpu에서의 random 값 생성 \ 
torch.cuda.manual_seed(random_seed)
# 2개 이상의 gpu를 사용할 때
torch.cuda.manual_seed_all(random_seed)
# randomness가 들어간 함수 미사용
torch.backends.cudnn.deterministic = True
# True일 시, 동일한 사이즈에 텐서들이 들어올때의 속도향상을 불러온다.
torch.backends.cudnn.benchmark = False
# numpy의 random seed 고정
np.random.seed(random_seed)
# random 라이브러리의 seed 고정
random.seed(random_seed)

##########################################################################
################################ Training ################################
##########################################################################

convnext_ = convnext.convnext_femto(pretrained=True)
FC_in_features = convnext_.head.fc.in_features
convnext_.head.fc = nn.Linear(in_features=FC_in_features, out_features=5)

print(convnext_)

model = convnext_

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 학습 환경 설정

model.to(device)

dir = '/content/datasets/ChestCT/'

torchvision_transform = transforms.Compose([
    transforms.Resize((300, 300)), 
    transforms.RandomCrop(288),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
    transforms.ToTensor()
])

class TrainDataset(Dataset):
    def __init__(self, file_path, transform=torchvision_transform):
        df = pd.read_csv(file_path)
        self.path = df.iloc[:, 0].values
        self.imgs = list(self.path)
        self.y = df.iloc[:, 1].values
        self.transform = transform

    def __getitem__(self, index):
        subdir = "ChestCT_Train/CT_Train_images/"
        img_path = dir + subdir + self.imgs[index]
        x = Image.open(img_path).convert("RGB")
        y = self.y[index]
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.imgs)
    
class TestDataset(Dataset):
    def __init__(self, file_path, transform=torchvision_transform):
        df = pd.read_csv(file_path)
        self.path = df.iloc[:, 0].values
        self.imgs = list(self.path)
        self.y = df.iloc[:, 1].values
        self.transform = transform

    def __getitem__(self, index):
        subdir = "ChestCT_Test/"
        img_path = dir + subdir + self.imgs[index]
        x = Image.open(img_path).convert("RGB")
        y = self.y[index]
        if self.transform:
            x = self.transform(x)
        return x, y
    
    def __len__(self):
        return len(self.imgs)
    
dataset = TrainDataset('/content/datasets/ChestCT/train.csv')
dataset_size = len(dataset)
train_size = int(dataset_size * 0.8)
validation_size = int(dataset_size - train_size)

train_dataset, val_dataset = random_split(dataset, [train_size, validation_size])

test_dataset = TestDataset('/content/datasets/ChestCT/test.csv')

print(f"Training Data Size : {len(train_dataset)}")
print(f"Validation Data Size : {len(val_dataset)}")
print(f"Testing Data Size : {len(test_dataset)}")

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, drop_last=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

class LabelSmoothLoss(nn.Module):
    def __init__(self, smoothing=0.0):
        super(LabelSmoothLoss, self).__init__()
        self.smoothing = smoothing
    
    def forward(self, input, target):
        log_prob = F.log_softmax(input, dim=-1)
        weight = input.new_ones(input.size()) * \
            self.smoothing / (input.size(-1) - 1.)
        weight.scatter_(-1, target.unsqueeze(-1), (1. - self.smoothing))
        loss = (-weight * log_prob).sum(dim=-1).mean()
        return loss

lr = 0.0001
num_epochs = 30
optimizer = optim.Adam(model.parameters(), lr=lr)
loss_function = LabelSmoothLoss(0.1).to(device)

scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [15,25],gamma=0.1)

params = {
    'num_epochs':num_epochs,
    'optimizer':optimizer,
    'loss_function':loss_function,
    'train_dataloader':train_dataloader,
    'val_dataloader': val_dataloader,
    'device':device
}

def train(model, params):
    loss_function=params["loss_function"]
    train_dataloader=params["train_dataloader"]
    val_dataloader=params["val_dataloader"]
    device=params["device"]
    
    best_acc = 0
    current_acc = 0
    test_loss = 0
    
    for epoch in range(0, num_epochs):
      model.train()
      start_time = time.time()
      for i, data in enumerate(train_dataloader, 0):
        # train dataloader 로 불러온 데이터에서 이미지와 라벨을 분리
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)

        # 이전 batch에서 계산된 가중치를 초기화
        optimizer.zero_grad() 

        # forward + back propagation 연산
        outputs = model(inputs)

        train_loss = loss_function(outputs, labels)

        train_loss.backward()
        optimizer.step()
      scheduler.step()

      # test accuracy 계산
      model.eval()
      total = 0
      correct = 0
      accuracy = []
      with torch.inference_mode():
        for i, data in enumerate(val_dataloader, 0):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            # 결과값 연산
            outputs = model(inputs)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            test_loss = loss_function(outputs, labels).item()
            current_acc = 100 * correct/total
            accuracy.append(100 * correct/total)
        
      if current_acc > best_acc:
        best_acc = current_acc
        torch.save(model.state_dict(), "/content/models/convnext_femto.pt")
        print(best_acc, " ‧₊˚.⋆·ฺ.∗̥✩⁺˚ ੈ‧˚૮꒰˵• ﻌ •˵꒱აੈ✩‧₊˚ੈ*:ﾟ*｡.⋆·ฺᐝ.∗̥⁺˚ Let's save the BEST MODEL!")

      torch.save(model.state_dict(), "/content/models/convnext_femto.pt")

      # 학습 결과 출력
      print('Epoch: %d/%d, Train loss: %.6f, Val loss: %.6f, Accuracy: %.2f' %(epoch+1, num_epochs, train_loss.item(), test_loss, 100*correct/total))
      print("Time: {:.4f}sec".format((time.time() - start_time)))
      
train(model, params)

In [None]:
###########################################################################
################################ Inference ################################
###########################################################################

# 경로 지정

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 학습 환경 설정

model2use = convnext.convnext_femto(pretrained=True)
FC_in_features = model2use.head.fc.in_features
model2use.head.fc = nn.Linear(in_features=FC_in_features, out_features=5)

model2use.load_state_dict(torch.load("/content/models/convnext_femto.pt"))
model2use.to(device)


def inference(img_path):
    global model2use
    img = Image.open(img_path).convert("RGB")
    img = torchvision_transform2(img)
    img = img.to(device)
    outputs = model2use(img[None, ...]) # or model.features(img[None,...]) 
    # print(outputs)
    return outputs

torchvision_transform2 = transforms.Compose([
    transforms.Resize((288, 288)), 
    transforms.ToTensor()
])

submission = open('/content/submissions/' + 'convnext_femto.csv', 'w')
wr = csv.writer(submission)
wr.writerow(["filename", "result"])

test_list = sorted(os.listdir('/content/datasets/ChestCT/ChestCT_Test'))
i = 0
for img_path in test_list:
    i += 1
    print(i)
    with torch.no_grad():
        model2use.eval()
        output = inference(os.path.join('/content/datasets/ChestCT/ChestCT_Test/',img_path))
        predict = int(output.argmax())
        print(predict)
        
        wr.writerow([img_path, predict])