kaggle dataset: (https://www.kaggle.com/datasets/pranavraikokte/covid19-image-dataset/data)

In [94]:
import os
import numpy as np
import cv2

import torch
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader

import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
import random

from sklearn.metrics import f1_score

In [95]:
CFG = {
    'IMG_SIZE':256,
    'EPOCHS':50,
    'LEARNING_RATE':0.0001,
    'BATCH_SIZE':16,
    'SEED':42
}

In [96]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [97]:
class COVID_Dataset(Dataset):
    def __init__(self, transform, mode):
        self.transform = transform
        self.image_folder = []
        self.labels = []
        categories = ['Covid', 'Normal', 'Viral Pneumonia']
        
        for i, category in enumerate(categories):
            image_files = sorted([mode+'/'+category+'/'+f for f in os.listdir(mode+'/'+category) if "mask" not in f.lower()])
            self.image_folder.extend(image_files)
            self.labels.extend([i]*len(image_files))
        
    def __len__(self):
        return len(self.image_folder)
    
    def __getitem__(self, idx):
        img_path = self.image_folder[idx]
        label = self.labels[idx]
        
        img = cv2.imread(img_path)
        img = self.transform(img)
        
        return (img, label)
        

In [98]:
train_transforms = transforms.Compose([transforms.ToTensor(),
                                       transforms.Resize((CFG['IMG_SIZE'],CFG['IMG_SIZE']))])
test_transforms = transforms.Compose([transforms.ToTensor(),
                                       transforms.Resize((CFG['IMG_SIZE'],CFG['IMG_SIZE']))])
train_set = COVID_Dataset(transform=train_transforms, mode='train')
train_loader = DataLoader(train_set, batch_size=CFG['BATCH_SIZE'], shuffle=True)
test_set = COVID_Dataset(transform=test_transforms, mode='test')
test_loader = DataLoader(test_set, batch_size=CFG['BATCH_SIZE'], shuffle=False)

In [99]:
class Trastion_module(nn.Module):
    def __init__(self, in_ch):
        super(Trastion_module, self).__init__()
        # Composition Function을 적용하여 BN -> AF -> Conv순
        self.tr_layer = nn.Sequential(
            nn.BatchNorm2d(in_ch),
            nn.ReLU(),
            nn.Conv2d(in_ch, in_ch//2, kernel_size=1, stride=1, bias=False)
        )
        # Feature Map 크기 감소
        self.ave_pool = nn.AvgPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        x = self.tr_layer(x)
        x = self.ave_pool(x)
        return x
    
class Botteleneck(nn.Module):
    # Dense block의 레이어들이 출력하는 FeatureMap 크기: K = Growh_rate
    def __init__(self, in_ch, growth_rate): 
        super(Botteleneck, self).__init__()

        self.conv1x1 = nn.Sequential(
            nn.BatchNorm2d(in_ch),
            nn.ReLU(),
            nn.Conv2d(in_ch, out_channels=4*growth_rate, kernel_size=1, bias=False)
        )

        self.conv3x3 = nn.Sequential(
            nn.BatchNorm2d(4*growth_rate),
            nn.ReLU(),
            nn.Conv2d(in_channels=4*growth_rate, out_channels=growth_rate, 
                      kernel_size=3, padding=1, bias=False)
        )

    def forward(self, x):
        identy = x
        x = self.conv1x1(x)
        x = self.conv3x3(x)

        x = torch.cat([identy, x], dim=1)
        return x
    
class DenseBlock(nn.Module):
    def __init__(self, num_block, in_ch, growth_rate, last_stage=False):
        super(DenseBlock, self).__init__()
        self.dense_ch = in_ch

        self.layers = nn.ModuleList()

        for _ in range(num_block):
            layer = Botteleneck(self.dense_ch, growth_rate)

            self.layers.append(layer)
            self.dense_ch += growth_rate

        if last_stage:
            self.layers.append(nn.BatchNorm2d(self.dense_ch))
            self.layers.append(nn.ReLU())
        
        else:
            self.layers.append(Trastion_module(self.dense_ch))
            assert self.dense_ch % 2 == 0,
            self.dense_ch //= 2


    def forward(self, x):
        for layer in self.layers:
            x = layer(x)

        return x

In [100]:
class DenseNet121(nn.Module):
    def __init__(self, block_list, growth_rate, n_classes=1000):
        super(DenseNet121, self).__init__()
        # self.densenet121 = models.densenet121(pretrained=True)
        # self.densenet121.classifier = nn.Linear(1024, 3)

        self.growth_rate = growth_rate

        self.stem = nn.Sequential( 
            nn.Conv2d(in_channels=3, out_channels=2*self.growth_rate,
                      kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(2*self.growth_rate),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.dense_ch = 2*self.growth_rate

        dense_blocks = []
        for i, num_block in enumerate(block_list):
            last_stage = (i == len(block_list) - 1)
            dense_blocks.append(DenseBlock(num_block, self.dense_ch, 
                                                    self.growth_rate, 
                                                    last_stage=last_stage))
            self.dense_ch = dense_blocks[-1].dense_ch

        self.dense_blocks = nn.Sequential(*dense_blocks)

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(self.dense_ch, n_classes)
        )
        
    def forward(self, x):
        # x = self.densenet121(x)
        x = self.stem(x)
        x = self.dense_blocks(x)
        x = self.classifier(x)
        return x

In [101]:
use_cuda = torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')

model = DenseNet121(block_list=[6, 12, 24, 16], growth_rate=32, n_classes=3).to(device)

criterion = nn.CrossEntropyLoss()
lr = 0.0001
optimizer = optim.Adam(model.parameters(), lr=lr)

In [102]:
for epoch in range(CFG['EPOCHS']):
    for i, (imgs, labels) in enumerate(train_loader):
        imgs = imgs.float().to(device)
        labels = labels.to(device)

        model.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        # print(outputs.argmax(1))
        # print(labels)
        
        loss.backward()
        optimizer.step()

    if epoch%1==0:
        model.eval()
        val_loss = []
        preds, true_labels = [], []
        with torch.no_grad():
            for imgs, labels in test_loader:
                imgs = imgs.float().to(device)
                labels = labels.to(device)
                
                pred = model(imgs)
                
                loss = criterion(pred, labels)
                
                preds += pred.argmax(1).detach().cpu().numpy().tolist()
                true_labels += labels.detach().cpu().numpy().tolist()
                
                val_loss.append(loss.item())
            
            _val_loss = np.mean(val_loss)
            _val_score = f1_score(true_labels, preds, average='macro')
        
        print(f'Epoch [{epoch}], Train Loss: {loss.item():.4f}, Val Loss: {_val_loss:.5f}, Val Macro F1: {_val_score:.5f}')


Epoch [0], Train Loss: 1.9267, Val Loss: 1.32337, Val Macro F1: 0.18841
Epoch [1], Train Loss: 1.1848, Val Loss: 1.12428, Val Macro F1: 0.18841
Epoch [2], Train Loss: 1.3356, Val Loss: 1.13511, Val Macro F1: 0.18841
Epoch [3], Train Loss: 1.4236, Val Loss: 1.16623, Val Macro F1: 0.18841
Epoch [4], Train Loss: 1.3569, Val Loss: 1.13443, Val Macro F1: 0.18841
Epoch [5], Train Loss: 1.2202, Val Loss: 1.09524, Val Macro F1: 0.18841
Epoch [6], Train Loss: 1.4316, Val Loss: 1.13269, Val Macro F1: 0.18841
Epoch [7], Train Loss: 1.1848, Val Loss: 1.04774, Val Macro F1: 0.34459
Epoch [8], Train Loss: 1.5493, Val Loss: 1.12690, Val Macro F1: 0.18841
Epoch [9], Train Loss: 1.0594, Val Loss: 0.90672, Val Macro F1: 0.52849
Epoch [10], Train Loss: 1.0922, Val Loss: 0.85435, Val Macro F1: 0.46881
Epoch [11], Train Loss: 1.4911, Val Loss: 0.84082, Val Macro F1: 0.56566
Epoch [12], Train Loss: 0.9155, Val Loss: 0.69016, Val Macro F1: 0.85919
Epoch [13], Train Loss: 1.0677, Val Loss: 0.64358, Val Macro 