# 라이브러리 로드

In [None]:
import os, re
import random, math
import shutil
import warnings 
warnings.filterwarnings(action='ignore')

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import numpy as np
from PIL import Image

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 하이퍼 파라미터

In [None]:
IMG_SIZE = 300

# 데이터 준비

In [None]:
normal, disease = [], []

normal += [f'chest_xray/data/train/NORMAL/{f}' for f in os.listdir('chest_xray/data/train/NORMAL')]
normal += [f'chest_xray/data/val/NORMAL/{f}' for f in os.listdir('chest_xray/data/val/NORMAL')]
disease += [f'chest_xray/data/train/PNEUMONIA/{f}' for f in os.listdir('chest_xray/data/train/PNEUMONIA')]
disease += [f'chest_xray/data/val/PNEUMONIA/{f}' for f in os.listdir('chest_xray/data/val/PNEUMONIA')]

In [None]:
x = normal + disease
y = [0] * len(normal) + [1] * len(disease)
train_x, val_x, train_y, val_y = train_test_split(x, y, test_size=0.1, random_state=32)

In [None]:
test_x, test_y = [], []
test_x += [f'chest_xray/data/test/NORMAL/{f}' for f in os.listdir('chest_xray/data/test/NORMAL')]
test_y += [0] * len(test_x)
test_x += [f'chest_xray/data/test/PNEUMONIA/{f}' for f in os.listdir('chest_xray/data/test/PNEUMONIA')]
test_y += [1] * (len(test_x) - len(test_y))

# 데이터셋 준비


In [None]:
class ImageDataset(Dataset):
    def __init__(self, files, labels, typ='train'):
        self.files = files
        self.labels = labels
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Resize((IMG_SIZE, IMG_SIZE))
        ])
        self.typ = typ
    
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        file, label = self.files[idx], self.labels[idx]
        image = Image.open(file).convert('L')
        image = np.array(image) / 255.
        image = self.transform(image)
        return image, label

In [None]:
train_set = ImageDataset(train_x, train_y, 'train')
train_dataloader = DataLoader(train_set, batch_size=32, shuffle=True)

val_set = ImageDataset(val_x, val_y, 'val')
val_dataloader = DataLoader(val_set, batch_size=32, shuffle=True)

test_set = ImageDataset(test_x, test_y, 'test')
test_dataloader = DataLoader(test_set, batch_size=32, shuffle=False)

In [None]:
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")
img = train_features[0].squeeze()
label = train_labels[0]
plt.imshow(img, cmap="gray")
plt.show()
print(f"Label: {label}")

## 모델 정의

In [None]:
class Pneumonia(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.block0 = self.make_block(1, 32, 3)
        self.block1 = self.make_block(32, 64, 3)
        self.block2 = self.make_block(64, 128, 3)
        self.block3 = self.make_block(128, 256)
        self.block4 = self.make_block(256, 512)
        self.block5 = self.make_block(512, 512)
        self.fc1 = nn.Linear(8192, 4096)
        self.fc2 = nn.Linear(4096, 1024)
        self.fc3 = nn.Linear(1024, 1)
        self.sigmoid = nn.Sigmoid()
        
    def make_block(self, in_c, out_c, kernel_size=2, stride=1):
        out = nn.Sequential(
            nn.Conv2d(in_c, out_c, kernel_size, stride, padding='same'),
            nn.ReLU(),
            nn.BatchNorm2d(out_c),
            nn.MaxPool2d(2, 2)
        )
        return out
        
        
    def forward(self, x):
        b, c, h, w = x.size()
        x = self.block0(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = torch.flatten(x, 1)
        x = F.dropout(F.relu(self.fc1(x)), 0.2)
        x = F.dropout(F.relu(self.fc2(x)), 0.2)
        out = self.sigmoid(self.fc3(x))
        return out

model = Pneumonia()
model = model.to(device)   

## Loss, Optimizer, Metric 정의

In [None]:
criterion = nn.BCELoss(reduction='mean')
optimizer = optim.Adam(model.parameters(), lr=0.001)

def evaluate(data_loader, name=''):
    correct = 0
    total = 0

    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device).float(), labels.to(device).float()
            outputs = model(inputs)
            predicted = torch.round(outputs.data).flatten()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    acc = round((100 * correct / total), 2)
    print(f'Accuracy of {name} images: {acc} %')

## 모델 훈련

In [None]:
for epoch in range(5):
    running_loss = 0.0
    print(f'{epoch+1}번째 epoch')
    for i, data in enumerate(train_dataloader):
        dataset_size = len(train_dataloader)
        inputs, labels = data
        inputs = inputs.float()
        labels = labels.unsqueeze(1).float()
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(inputs)
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 10 == 9:
            print(f'batch {i+1} / {dataset_size} - loss: {running_loss / 10}')
            running_loss = 0
    print()
    evaluate(val_dataloader, 'val')

print('Finished Training')

## Test set 결과 확인

In [None]:
evaluate(test_dataloader, 'test')