In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader

from torchvision import transforms
from torchvision import models
from torchsummary import summary

import os
import sys
import cv2
import matplotlib.pyplot as plt

from ipywidgets import interact

## 데이터 전처리

### kaggle datasets Download
- 데이터 : 흉부 X-ray 데이터
- 캐글 데이터 주소: https://www.kaggle.com/datasets/pranavraikokte/covid19-image-dataset

In [None]:
!pip install kaggle --upgrade

In [None]:
from google.colab import files
files.upload()

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
# Permission Warning이 발생하지 않도록 해줍니다.
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d pranavraikokte/covid19-image-dataset

In [None]:
!unzip -qq '/content/covid19-image-dataset.zip'
!mkdir './ckpt'

- 데이터셋 확인

In [None]:
# train/test 로 나누었지만, 데이터가 작아서 더 나누기엔 애매한 부분이 있어서, test부분을 valid로 쓸 것.
train_data_dir = '/content/Covid19-dataset/train/'
test_data_dir = '/content/Covid19-dataset/test/'
class_list = ['Normal', 'Covid', 'Viral Pneumonia']

def get_image_file(data_dir, state):
    image_type = ['png', 'jpeg', 'jpg']
    image_list = []
    image_dir = os.path.join(data_dir, state)
    for file_name in os.listdir(image_dir):
        if file_name.split('.')[-1] in image_type:
            image_list.append(os.path.join(state, file_name))
    return image_list

In [None]:
normal_images = get_image_file(train_data_dir,'Normal')
covid_images = get_image_file(train_data_dir,'Covid')
pneumonia_images = get_image_file(train_data_dir,'Viral Pneumonia')

print(len(normal_images), len(covid_images), len(pneumonia_images), )

In [None]:
def get_image_RGB(data_dir, state):
    file = os.path.join(data_dir, state)
    image = cv2.imread(file)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image

@interact (idx=(0, len(normal_images)-1))
def show_images(idx=0):
    normal = get_image_RGB(train_data_dir, normal_images[idx])
    covid = get_image_RGB(train_data_dir, covid_images[idx])
    pneumonia = get_image_RGB(train_data_dir, pneumonia_images[idx])

    plt.figure(figsize=(15, 5))
    plt.subplot(131)
    plt.imshow(normal)
    plt.title('Normal')

    plt.subplot(132)
    plt.imshow(covid)
    plt.title('Covid')

    plt.subplot(133)
    plt.imshow(pneumonia)
    plt.title('Pneumonia')

    plt.show()

### 데이터 전처리
- 데이터셋 구축 및 로더

In [None]:
BATCH_SIZE = 4
NUM_EPOCHS = 10
LEARNING_RATE = 0.001

transformer = transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Resize((224, 224)),
                                transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

In [None]:
class MyDataset():
    def __init__(self, train_data_dir, transformer=None):
        self.train_data_dir = train_data_dir
        normal = get_image_file(train_data_dir, 'Normal')
        covid = get_image_file(train_data_dir, 'Covid')
        pneumonial = get_image_file(train_data_dir, 'Viral Pneumonia')

        self.files_list = normal + covid + pneumonial
        self.transformer = transformer

    def __len__(self):
        return len(self.files_list)

    def __getitem__(self, idx):
        image_list = os.path.join(self.train_data_dir, self.files_list[idx])
        image = cv2.imread(image_list)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        class_name = self.files_list[idx].split(os.sep)[0]
        class_id = class_list.index(class_name)
        # print('Class    : ', class_name, '\nClass Id : ', class_id)

        if self.transformer:
            image = self.transformer(image)
            class_id = torch.Tensor([class_id]).long()

        return image, class_id

print('dataset length : ', len(MyDataset(test_data_dir)))

In [None]:
dst = MyDataset(test_data_dir)

In [None]:
image, target = dst[55]

print(image.shape, target)

In [None]:
plt.figure(figsize=(5, 5))
plt.title(class_list[target])
plt.imshow(image)
plt.show()

- dataloader #1 : 비추천

In [None]:
trainset = MyDataset(train_data_dir, transformer)
testset = MyDataset(test_data_dir, transformer)

In [None]:
train_data = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
val_data = DataLoader(testset, batch_size=1, shuffle=False, drop_last=False) # test 데이터 셋으로 valid 진행

- dataloader #2 : 추천

In [None]:
# 데이터 구축할 때 함수로 하는 것을 권장
def DataLoaders(train_data_dir, test_data_dir):
    dataloader = {}
    trainset = MyDataset(train_data_dir, transformer)
    testset = MyDataset(test_data_dir, transformer)
    dataloader['train'] = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
    dataloader['val'] = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False, drop_last=True)

    return dataloader

## 모델 : VGG19
- PyTorch에서 제공하는 모델 사용
- 해당 프로젝트에 맞게 모델 변경
- - class 수 : 1,000개 -> 3개
- - AdaptiveAvgPool2d-38 부분부터 모델 변경 : output=(1, 1) -> Flattn

In [None]:
model = models.vgg19(pretrained=True) # pretrained된 모델 불러오기

In [None]:
summary(model, (3, 224, 224), batch_size=BATCH_SIZE, device='cpu')

In [None]:
# model.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
# model.classifier = nn.Sequential(
#                                 nn.Flatten(),
#                                 nn.Linear(512, 256),
#                                 nn.ReLU(),
#                                 nn.Dropout(0.1),
#                                 nn.Linear(256, 128),
#                                 nn.ReLU(),
#                                 nn.Linear(128, 3),
#                                 nn.Softmax(dim=1)
# )

In [None]:
def build_vgg19_model(device='cpu'):
    device = torch.device(device)
    model = models.vgg19(pretrained=True)
    model.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1)) # 1 이 부분부터 교체
    model.classifier = nn.Sequential(                        # 2 다음부터 프로젝트에 맞게 교체
                                nn.Flatten(),
                                nn.Linear(512, 256),
                                nn.ReLU(),
                                nn.Linear(256, 128),
                                nn.Linear(128, 64),
                                nn.ReLU(),
                                nn.Linear(64, 3),
                                nn.Softmax(dim=1)
    )
    return model

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
model = build_vgg19_model(device='cuda')
summary(model, (3, 224, 224), batch_size=BATCH_SIZE, device='cuda')
model.to(device)

- loss_fn/optimizer 설정

In [None]:
criterion = nn.CrossEntropyLoss(reduction='mean') # reduction='mean' - batch_size에 따라서(4로 설정했기에 평균값을 구하도록)
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.9)

- train 구간

In [None]:
def train_mode(dataloader, model, optimizer, loss_fn, device):
    iter_loss, iter_acc = {}, {}

    for phase in ['train', 'val']:
        running_loss = 0.0
        running_acc = 0.0

        if phase == 'train':
            model.train()
        else:
            model.eval()

        for idx, (image, label) in enumerate(dataloader[phase]):
            image = image.to(device)
            label = label.squeeze(-1).to(device)

            with torch.set_grad_enabled(phase == 'train'):
                output = model(image)
                loss = criterion(output, label)

                optimizer.zero_grad()

                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            running_loss += loss.item()
            running_acc += get_accuracy(image, label, model)

            if phase == 'train':
                if idx % 10 == 0:
                    print(f'{idx}/{len(dataloader["train"])} - Running Loss : {loss.item()}')

        iter_loss[phase] = running_loss / len(dataloader[phase])
        iter_acc[phase] = running_acc / len(dataloader[phase])

    return iter_loss, iter_acc

In [None]:
@torch.no_grad()
def get_accuracy(image, target, model):
    batch_size = image.shape[0]
    output = model(image) # output -> (B, 확률값: idx)
    _, pred = torch.max(output, 1)
    acc_cnt = (pred == target) # True/False 반환
    acc_cnt = acc_cnt.cpu().numpy().sum() / BATCH_SIZE

    return acc_cnt

In [None]:
best_acc = 0.0
train_loss, train_acc = [], []
val_loss, val_acc = [], []

for epoch in range(NUM_EPOCHS):
    iter_loss, iter_acc = train_mode(dataloader, model, optimizer, criterion, device)
    train_loss.append(iter_loss['train'])
    train_acc.append(iter_acc['train'])
    val_loss.append(iter_loss['val'])
    val_acc.append(iter_acc['val'])

    print(f'{epoch}/{NUM_EPOCHS} - Train Loss : {iter_loss["train"]}, Val Loss : {iter_loss["val"]}')
    print(f'{epoch}/{NUM_EPOCHS} - Train Acc : {iter_acc["train"]}, Val Acc : {iter_acc["val"]}')

    if iter_acc['val'] > best_acc:
        best_acc = iter_acc['val']
        torch.save(model.state_dict(), f'./ckpt/model_{epoch}.pth')

In [None]:
plt.figure(figsize=(6, 5))

plt.subplot(211)
plt.plot(train_loss, label="train")
plt.plot(val_loss,  label="val")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.grid("on")
plt.legend()

plt.subplot(212)
plt.plot(train_acc, label="train")
plt.plot(val_acc, label="val")
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.grid("on")
plt.legend()

plt.tight_layout()
plt.show()

## test

In [None]:
data_dir = '/content/Covid19-dataset/test'
class_list = ['Normal', 'Covid', 'Viral Pneumonia']

test_normals = get_image_file(data_dir, 'Normal')
test_covids = get_image_file(data_dir, 'Covid')
test_pneumonia = get_image_file(data_dir, 'Viral Pneumonia')

In [None]:
def preprocess_image(image):
    transformer = transforms.Compose([
                                    transforms.ToTensor(),
                                    transforms.Resize((224, 224)),
                                    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
    ])

    tensor_image = transformer(image) #(C, H, W)
    tensor_image = tensor_image.unsqueeze(dim=0) # (B, C, H, W) : tensor 차원을 맞춰야 한다.

    return tensor_image

In [None]:
def model_predict(image, model):
    tensor_image = preprocess_image(image)
    output = model(tensor_image)
    _, pred = torch.max(output.cpu(), 1)
    pred = pred.squeeze(0)

    return pred.item()

In [None]:
ckpt = torch.load('/content/ckpt/model_9.pth') # best acc model : training 중 저장한 weight 모델

model = build_vgg19_model(device='cpu')
model.load_state_dict(ckpt)
model.eval()

In [None]:
mNum = min(len(test_normals), len(test_covids), len(test_pneumonia))
@interact(index=(0, mNum-1))
def show_eval(index=0):
    normal = get_image_RGB(data_dir, test_normals[index])
    covids = get_image_RGB(data_dir, test_covids[index])
    pneumonia = get_image_RGB(data_dir, test_pneumonia[index])

    pred_normal = model_predict(normal, model)
    pred_covid = model_predict(covids, model)
    pred_pneumonia = model_predict(pneumonia, model)

    plt.figure(figsize=(15, 5))
    plt.subplot(131)
    plt.title(f'Test : {class_list[pred_normal]} | Label : Normal')
    plt.imshow(normal)

    plt.subplot(132)
    plt.title(f'Test : {class_list[pred_covid]} | Label : Covid')
    plt.imshow(covids)

    plt.subplot(133)
    plt.title(f'Test : {class_list[pred_pneumonia]} | Label : Pneumonia')
    plt.imshow(pneumonia)

    plt.show()