In [17]:
!pip install pandas

Collecting pandas
  Downloading pandas-2.2.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.2.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
[?25hDownloading pytz-2024.1-py2.py3-none-any.whl (505 kB)
Downloading tzdata-2024.1-py2.py3-none-any.whl (345 kB)
Installing collected packages: pytz, tzdata, pandas
Successfully installed pandas-2.2.2 pytz-2024.1 tzdata-2024.1


In [39]:
!pip install seaborn

Collecting seaborn
  Using cached seaborn-0.13.2-py3-none-any.whl.metadata (5.4 kB)
Using cached seaborn-0.13.2-py3-none-any.whl (294 kB)
Installing collected packages: seaborn
Successfully installed seaborn-0.13.2


In [8]:
!pip install albumentations

Collecting albumentations
  Downloading albumentations-1.4.14-py3-none-any.whl.metadata (38 kB)
Collecting scikit-image>=0.21.0 (from albumentations)
  Downloading scikit_image-0.24.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Collecting pydantic>=2.7.0 (from albumentations)
  Using cached pydantic-2.8.2-py3-none-any.whl.metadata (125 kB)
Collecting albucore>=0.0.13 (from albumentations)
  Downloading albucore-0.0.14-py3-none-any.whl.metadata (3.1 kB)
Collecting eval-type-backport (from albumentations)
  Downloading eval_type_backport-0.2.0-py3-none-any.whl.metadata (2.2 kB)
Collecting opencv-python-headless>=4.9.0.80 (from albumentations)
  Downloading opencv_python_headless-4.10.0.84-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting annotated-types>=0.4.0 (from pydantic>=2.7.0->albumentations)
  Using cached annotated_types-0.7.0-py3-none-any.whl.metadata (15 kB)
Collecting pydantic-core==2.20.1 (from pydantic>=2.7.0

In [95]:
!pip install compose



In [157]:
import torch
import torch.nn as nn
import torch.optim as optim
import albumentations as A
from torchvision.transforms import ToTensor, Compose, Grayscale
from torch.utils.data import DataLoader, Dataset, random_split
from PIL import Image
import numpy as np
from torchvision.io import read_image
import pandas as pd
import glob
import os

In [158]:
# 데이터 경로 설정
data_dir = "/home/downtown/new_folder/deep-learning/module/module-1/chest_xray"
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")

# 데이터프레임 생성 함수 정의
def create_dataframe(data_dir):
    image_paths = glob.glob(f"{data_dir}/*/*")
    data = {'image_path': [], 'label': []}
    for path in image_paths:
        if 'NORMAL' in path:
            data['image_path'].append(path)
            data['label'].append(0)  # NORMAL -> 0
        elif 'PNEUMONIA' in path:
            data['image_path'].append(path)
            data['label'].append(1)  # PNEUMONIA -> 1
    return pd.DataFrame(data)

# Train과 Test 데이터프레임 생성
train_df = create_dataframe(train_dir)
test_df = create_dataframe(test_dir)

In [159]:
# 이미지 파일 경로 확인
print("Train 이미지 파일 수:", len(glob.glob(f"{train_dir}/*/*")))
print("Test 이미지 파일 수:", len(glob.glob(f"{test_dir}/*/*")))

Train 이미지 파일 수: 5216
Test 이미지 파일 수: 640


In [160]:
# Dataset 클래스 정의
class ImageDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx, 0]
        image = Image.open(img_path).convert("RGB")  # 이미지를 RGB로 로드
        label = self.df.iloc[idx, 1]

        if self.transform:
            image = np.array(image)  # PIL 이미지를 NumPy 배열로 변환
            augmented = self.transform(image=image)  # 변환 적용
            image = augmented['image']
        
        # 추가 변환: 명시적으로 그레이스케일로 변환
        if image.shape[2] == 3:  # 만약 3채널이라면
            image = Image.fromarray((image * 255).astype(np.uint8)).convert('L')  # float32에서 uint8로 변환 후 그레이스케일로 변환
            image = np.array(image)  # 다시 NumPy 배열로 변환

        # NumPy 배열을 텐서로 변환
        image = torch.tensor(image, dtype=torch.float32).unsqueeze(0)  # (H, W) -> (1, H, W)로 변경

        return image, label

In [161]:
# Albumentations 데이터 증강 정의
train_transform = A.Compose([
    A.Resize(256, 256),
    # A.ToGray(always_apply=True, p=1.0),  # 이미지를 그레이스케일로 변환
    A.CLAHE(always_apply=False, p=1.0, clip_limit=(1, 18), tile_grid_size=(29, 1)),
    A.Blur(always_apply=False, p=1.0, blur_limit=(3, 7)),
    A.Downscale(always_apply=False, p=1.0, scale_min=0.1, scale_max=0.8),
    A.Normalize(mean=(0.5,), std=(0.5,))  # 정규화
])

# 검증 및 테스트 변환 (데이터 증강 없음)
val_test_transform = A.Compose([
    A.Resize(256, 256),
    # A.ToGray(always_apply=True, p=1.0),  # 이미지를 그레이스케일로 변환
    A.Normalize(mean=(0.5,), std=(0.5,))  # 정규화
])

# 학습 데이터셋 생성
full_train_dataset = ImageDataset(train_df, transform=train_transform)

# 학습 데이터셋과 검증 데이터셋으로 나누기 (80% train, 20% validation)
train_size = int(0.8 * len(full_train_dataset))
val_size = len(full_train_dataset) - train_size
train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])

# 검증 데이터셋 변환 적용
val_dataset.dataset.transform = val_test_transform

# 테스트 데이터셋 생성
test_dataset = ImageDataset(test_df, transform=val_test_transform)

# 데이터로더 생성
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [162]:
# 이진 분류 모델 정의
class BinaryClassificationModel(nn.Module):
    def __init__(self):
        super(BinaryClassificationModel, self).__init__()
        self.layer_1 = nn.Linear(256 * 256, 128)  # 입력 크기 256*256에 맞게 설정
        self.bn1 = nn.BatchNorm1d(128)
        self.layer_2 = nn.Linear(128, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.layer_3 = nn.Linear(64, 1)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # 입력을 평탄화
        x = self.layer_1(x)
        x = self.bn1(x)
        x = torch.relu(x)
        x = self.layer_2(x)
        x = self.bn2(x)
        x = torch.relu(x)
        z = self.layer_3(x)
        return z

# 모델 인스턴스 생성
model = BinaryClassificationModel()

# GPU 사용 설정 "cuda" if torch.cuda.is_available() else 
device = torch.device("cpu")
model.to(device)

# 손실 함수 및 옵티마이저 설정
loss_function = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# 모델 학습 루프
num_epochs = 40
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.float().unsqueeze(1).to(device)

        optimizer.zero_grad()  # 옵티마이저 초기화
        outputs = model(images)  # 모델 예측
        loss = loss_function(outputs, labels)  # 손실 계산
        loss.backward()  # 역전파
        optimizer.step()  # 가중치 업데이트
        epoch_loss += loss.item()

        # 정확도 계산
        predicted = (torch.sigmoid(outputs) > 0.5).float()  # 로짓을 확률로 변환하고 이진 클래스로 변환
        total += labels.size(0)  # 총 레이블 수
        correct += (predicted == labels).sum().item()  # 정확한 예측의 개수

    train_accuracy = correct / total  # 학습 정확도 계산

    # 검증 단계
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.float().unsqueeze(1).to(device)
            outputs = model(images)
            loss = loss_function(outputs, labels)
            val_loss += loss.item()

            # 정확도 계산
            predicted = (torch.sigmoid(outputs) > 0.5).float()
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_accuracy = val_correct / val_total  # 검증 정확도 계산

    print(f"Epoch {epoch+1}, Train Loss: {epoch_loss / len(train_loader):.4f}, "
          f"Train Accuracy: {train_accuracy:.4f}, "
          f"Validation Loss: {val_loss / len(val_loader):.4f}, "
          f"Validation Accuracy: {val_accuracy:.4f}")


Epoch 1, Train Loss: 0.2287, Train Accuracy: 0.9101, Validation Loss: 0.3165, Validation Accuracy: 0.8707
Epoch 2, Train Loss: 0.1141, Train Accuracy: 0.9602, Validation Loss: 0.1886, Validation Accuracy: 0.9330
Epoch 3, Train Loss: 0.0553, Train Accuracy: 0.9801, Validation Loss: 0.7119, Validation Accuracy: 0.8372
Epoch 4, Train Loss: 0.0349, Train Accuracy: 0.9892, Validation Loss: 0.2875, Validation Accuracy: 0.9291
Epoch 5, Train Loss: 0.0462, Train Accuracy: 0.9849, Validation Loss: 0.3026, Validation Accuracy: 0.9234
Epoch 6, Train Loss: 0.0355, Train Accuracy: 0.9885, Validation Loss: 0.2920, Validation Accuracy: 0.9224
Epoch 7, Train Loss: 0.0269, Train Accuracy: 0.9897, Validation Loss: 0.5434, Validation Accuracy: 0.8937
Epoch 8, Train Loss: 0.0141, Train Accuracy: 0.9962, Validation Loss: 0.3101, Validation Accuracy: 0.9262
Epoch 9, Train Loss: 0.0208, Train Accuracy: 0.9911, Validation Loss: 0.6370, Validation Accuracy: 0.8841
Epoch 10, Train Loss: 0.0221, Train Accuracy: 

In [163]:
# 학습만 gpu, 계산은 cpu

In [166]:
# 모델 평가 및 예측
model.eval()
test_loss = 0
correct = 0
total = 0
all_labels = []
all_predictions = []

with torch.no_grad():
    for images, labels in test_loader:
        # GPU로 데이터 이동
        images, labels = images.to(device), labels.float().unsqueeze(1).to(device)
        
        outputs = model(images)
        loss = loss_function(outputs, labels)
        test_loss += loss.item()
        
        # 0.5 기준으로 이진 분류
        predicted = (torch.sigmoid(outputs) > 0.5).float()  
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy().flatten())
        
        # 정확도 계산
        total += labels.size(0)  # 총 레이블 수
        correct += (predicted == labels).sum().item()  # 정확한 예측의 개수

# 최종 테스트 손실 및 정확도 출력
test_accuracy = correct / total  # 테스트 정확도 계산
print(f"Test Loss: {test_loss / len(test_loader):.4f}, Test Accuracy: {test_accuracy:.4f}")

Test Loss: 2.4770, Test Accuracy: 0.7734


In [167]:
# 분류 레포트 출력

from sklearn.metrics import classification_report

class_labels = ['NORMAL', 'PNEUMONIA']
report = classification_report(all_labels, all_predictions, target_names=class_labels)
print(report)

              precision    recall  f1-score   support

      NORMAL       0.91      0.45      0.60       242
   PNEUMONIA       0.74      0.97      0.84       398

    accuracy                           0.77       640
   macro avg       0.83      0.71      0.72       640
weighted avg       0.81      0.77      0.75       640

