<a href="https://colab.research.google.com/github/merucode/DL/blob/81-colab-keggle_image/03-02_%5BImage-Classification-TL-CNN%5D_Chest-Xray-Pneumonia(improvement).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imformation

* Title : [Chest X-Ray Penumonia](https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia)
* Type : Image Binary classification
* Evaluation : Accuracy, Recall, F1
* Model : CNN(Transfer learning)
* Python version: 3.10.6
* Basic library version
  * torch(torch==2.0.1+cu118)
  * torchvision(torchvision==0.15.2+cu118)
  * sklearn(scikit-learn==1.2.2)
  * cv2(opencv-python==4.7.0.72)
  * numpy(numpy==1.22.4)
  * pandas(pandas==1.5.3)
  * matplotlib(matplotlib==3.7.1)
  * zipfile, random, math, shutil, os.
* Addtional Library version
  * transformers(transformers==4.31.0)
  * efficientnet_python(efficientnet-python==0.7.1): 사전 학습 모델
* Considering Library version
  * albumentations(albumentations==1.2.1): 이미지 변환기
* Improvement
  * (Learning) Optimizer, Scheduler
  * Ensemble

# STEP 0. Version check and Install Dependency

Step 0-1. Install Dependency

In [None]:
!pip install transformers
!pip install efficientnet-pytorch

Step 0-2. Version Check

In [None]:
import sys
import torch
print(f"Python version:{sys.version}")                  # python
print("Torch version:{}".format(torch.__version__))     # torch
print("cuda version: {}".format(torch.version.cuda))    # cuda
print("cudnn version:{}".format(torch.backends.cudnn.version()))    # cudnn

In [None]:
!pip list

Step 0-3. Download Data

In [None]:
!export KAGGLE_USERNAME=*** && export KAGGLE_KEY=*** && kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

In [None]:
from zipfile import ZipFile

data_path = '/content/'

with ZipFile(data_path + 'chest-xray-pneumonia.zip') as zipper:
  zipper.extractall()

# STEP 1. Check Data

Step 1-1. Check data

In [None]:
import pandas as pd

# 데이터 경로
data_path = '/content/chest_xray/'

# 훈련, 검증, 테스트 데이터 경로 설정
train_path = data_path + 'train/'
valid_path = data_path + 'val/'
test_path = data_path + 'test/'

In [None]:
from glob import glob

print(f"훈련 데이터 개수: {len(glob(train_path + '*/*'))}")
print(f"검증 데이터 개수: {len(glob(valid_path + '*/*'))}")
print(f"테스트 데이터 개수: {len(glob(test_path + '*/*'))}")

In [None]:
all_normal_imgs = []    # 모든 정상 이미지를 담을 리스트 초기화
all_pneumonia_imgs = [] # 모든 폐렴 이미지를 담을 리스트 초기화

for cat in ['train/', 'val/', 'test/']:
  data_cat_path = data_path + cat
  # 정상, 폐렴 이미지 경로
  normal_imgs = glob(data_cat_path + 'NORMAL/*')
  pneumonia_imgs = glob(data_cat_path + 'PNEUMONIA/*')
  # 정상, 폐렴 이미지 경로를 리스트에 추가
  all_normal_imgs.extend(normal_imgs)
  all_pneumonia_imgs.extend(pneumonia_imgs)

print(f"정상 흉부 이미지 개수: {len(all_normal_imgs)}")
print(f"폐렴 흉부 이미지 개수: {len(all_pneumonia_imgs)}")

Step 1-2. Data Visualize

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline

mpl.rc('font', size=15)
plt.figure(figsize=(7, 7))

label = ['Normal', 'Pneumonia']  # 타깃값 레이블
# 타깃값 분포 파이 그래프
plt.pie([len(all_normal_imgs), len(all_pneumonia_imgs)],
        labels = label,
        autopct='%.1f%%')

Step 1-3. Data Image Visualize

In [None]:
import matplotlib.gridspec as gridspec
import cv2 # OpenCV 라이브러리

def show_images(img_paths, rows=2, cols=3):
  assert len(img_paths) <= rows * cols # 이미지가 행/열 개수보다 많으면 오류 발생

  plt.figure(figsize=(15, 8)) # 전체 Figure 크기 설정
  grid = gridspec.GridSpec(rows, cols) # 서브플롯 배치

  # 이미지 출력
  for idx, img_path in enumerate(img_paths):
    image = cv2.imread(img_path)                    # 이미지 파일 읽기
    ax = plt.subplot(grid[idx])
    ax.imshow(image)                                # 이미지 출력

In [None]:
# 각 타깃값별 image_paths(마지막 6개)
num_of_imgs = 6
normal_img_paths = all_normal_imgs[-num_of_imgs:]
pneumonia_img_paths = all_pneumonia_imgs[-num_of_imgs:]

In [None]:
show_images(normal_img_paths)

In [None]:
show_images(pneumonia_img_paths)

# STEP 2. Setting for Modeling

Step 2-1. Seed

In [None]:
import torch
import random
import numpy as np
import os

# 시드값 고정
seed = 50
os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed)                 # 파이썬 난수 생석이 시드 고정
np.random.seed(seed)              # 넘파이 난수 생성기 시드 고정
torch.manual_seed(seed)           # 파이토치 난수 생성기 시드 고정(CPU 사용시)
torch.cuda.manual_seed(seed)      # 파이토치 난수 생성기 시드 고정(GPU 사용시)
torch.cuda.manual_seed_all(seed)  # 파이토치 난수 생성기 시드 고정(멀티 GPU 사용 시)
torch.backends.cudnn.deterministic = True # 확정적 연산 사용
torch.backends.cudnn.benchmark = False    # 벤치마크 기능 해제
torch.backends.cudnn.enabled = False      # cudnn 사용 해제

Step 2-2.GPU 장비 설정

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

device

# STEP 3. Dataset

Step 3-1. Load Data

In [None]:
import pandas as pd

# 데이터 경로
data_path = '/content/chest_xray/'

# 훈련, 검증, 테스트 데이터 경로 설정
train_path = data_path + 'train/'
valid_path = data_path + 'val/'
test_path = data_path + 'test/'

Step 3-2. Dataset

In [None]:
from torchvision import transforms as T

# 훈련 데이터용 변환기
transform_train = T.Compose([
    T.CenterCrop(180),            # 중앙 이미지 확대
    T.RandomHorizontalFlip(0.5),  # 좌우 대칭
    T.RandomVerticalFlip(0.2),    # 상하 대칭
    T.RandomRotation(20),         # 이미지 회전
    T.ToTensor(),                 # 텐서 객체로 변환
    T.Normalize((0.485, 0.456, 0.406),
                (0.229, 0.224, 0.225))])  # 정규화

# 테스트 데이터용 변환기
transform_test = T.Compose([
    T.Resize((250, 250)),
    T.CenterCrop(180),
    T.ToTensor(),
    T.Normalize((0.485, 0.456, 0.406),
                (0.229, .224, 0.225))])

In [None]:
from torchvision.datasets import ImageFolder

# 훈련 데이터셋
dataset_train = ImageFolder(root=train_path, transform=transform_train)
# 검증 데이터셋
dataset_valid = ImageFolder(root=valid_path, transform=transform_test)

Step 3-3. Dataloader(multiprocess)

In [None]:
# 멀티프로세서 사용 시 데이터 로더 시드 고정
def seed_worker(worker_id):
  worker_seed = torch.initial_seed() % 2**32
  np.random.seed(worker_seed)
  random.seed(worker_seed)

g = torch.Generator()
g.manual_seed(0)

In [None]:
from torch.utils.data import DataLoader # 데이터 로더 클래스

batch_size = 8

loader_train = DataLoader(dataset=dataset_train, batch_size=batch_size,
                          shuffle=True, worker_init_fn=seed_worker,
                          generator=g, num_workers=2)
loader_valid = DataLoader(dataset=dataset_valid, batch_size=batch_size,
                          shuffle=False, worker_init_fn=seed_worker,
                          generator=g, num_workers=2)

# STEP 4. Module
### ★Improvement: Ensemble

Step 4-1. Pretrained Model Load + ★Improvement: Ensemble

In [None]:
models_list = [] # ★ 모델 저장용 리스트

In [None]:
from efficientnet_pytorch import EfficientNet # EfficientNet 모듈

# ★
# 사전 훈련된 'efficientnet' 모델 불러오기
efficientnet_b1 = EfficientNet.from_pretrained('efficientnet-b1', num_classes=2)  # num_classes : 최종 출력 갯수
efficientnet_b2 = EfficientNet.from_pretrained('efficientnet-b2', num_classes=2)
efficientnet_b3 = EfficientNet.from_pretrained('efficientnet-b3', num_classes=2)

# ★
# 장비 할당
device = "cuda" if torch.cuda.is_available() else "cpu"
efficientnet_b1 = efficientnet_b1.to(device)
efficientnet_b2 = efficientnet_b2.to(device)
efficientnet_b3 = efficientnet_b3.to(device)

# ★
# 리스트에 모델 저장
models_list.append(efficientnet_b1)
models_list.append(efficientnet_b2)
models_list.append(efficientnet_b3)

In [None]:
for idx, model in enumerate(models_list):
  num_parmas = sum(param.numel() for param in model.parameters())
  print(f"모델{idx+1} 파라미터 갯수: {num_parmas}")

# STEP 5. Learning and Validation
### ★Improvement: Optimizer, Scheduler, Ensemble

Step 5-1. Setting + ★Improvement: Optimizer, Scheduler, Ensemble

In [None]:
import tqdm
import torch.nn as nn
from torch.optim.adamw import AdamW

#device = "cuda" if torch.cuda.is_available() else "cpu"
#model = model.to(device)
#model = nn.DataParallel(model)       # 병렬 GPU 사용

# 손실 함수
criterion = nn.CrossEntropyLoss()
# ★ 옵티마이저
optimizer1 = AdamW(models_list[0].parameters(), lr=0.0006, weight_decay=0.001)
optimizer2 = AdamW(models_list[1].parameters(), lr=0.0006, weight_decay=0.001)
optimizer3 = AdamW(models_list[2].parameters(), lr=0.0006, weight_decay=0.001)

# ★ Scheduler
epochs = 5
from transformers import get_cosine_schedule_with_warmup
scheduler1 = get_cosine_schedule_with_warmup(optimizer1, num_warmup_steps=len(loader_train)*3,
                                             num_training_steps=len(loader_train)*epochs)
scheduler2 = get_cosine_schedule_with_warmup(optimizer2, num_warmup_steps=len(loader_train)*3,
                                             num_training_steps=len(loader_train)*epochs)
scheduler3 = get_cosine_schedule_with_warmup(optimizer3, num_warmup_steps=len(loader_train)*3,
                                             num_training_steps=len(loader_train)*epochs)

Step 5-2. Learning Function

In [None]:
from sklearn.metrics import accuracy_score, recall_score, f1_score
from tqdm.notebook import tqdm

def train(model, loader_train, loader_valid, criterion, optimizer,
          scheduler=None, epochs=10, save_file='model_state_dic.pth'):
  valid_loss_min = np.inf # 최소 손실값 초기화(검증데이터용)

  # 총 에폭만큼 반복
  for epoch in range(epochs):
    print(f"에폭 [{epoch + 1}/{epochs}] \n-------------------------")

    # == [훈련] ================================
    model.train()         # 모델을 훈련 상태로 설정
    epoch_train_loss = 0  # 에폭별 손실값 초기화(훈련 데이터용)

    # 반복 횟수 만큼 반복
    for images, labels in tqdm(loader_train):
      # 이미지, 레이블 데이터 미니배치를 장비 할당
      images = images.to(device)
      labels = labels.to(device)

      optimizer.zero_grad()       # 기출기 초기화
      outputs = model(images) # 순전파
      loss = criterion(outputs, labels)   # 손실값 계산(훈련 데이터용)
      epoch_train_loss += loss.item()     # 현재 배치에서의 손실 추가
      loss.backward()         # 역전파
      optimizer.step()            # 가중치 갱신

      if scheduler != None:   # 스케줄러 학습률 갱신
        scheduler.step()


    print(f"\t훈련 데이터 손실값: {epoch_train_loss/len(loader_train):.4f}")


    # == [검증] ================================
    model.eval()    # 모델을 평가 상태로 설정
    epoch_valid_loss = 0  # 에폭별 손실값 초기화(검증데이터용)
    preds_list = [] # 에측값 저장용 리스트
    true_list = []  # 실제값 저장용 리스트

    with torch.no_grad(): # 기울기 계산 비활성화
      # 미니 배치 단위로 검증
      for images, labels in loader_valid:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)           # 순전파
        loss = criterion(outputs, labels) # 손실값 계산(검증 데이터용)
        epoch_valid_loss += loss.item()   # 현재 배치에서의 손실 추가

        # 예측값 및 실제값
        preds = torch.max(outputs.cpu(), dim=1)[1].numpy()  # torch.max[1]은 최대값의 열(0 or 1)을 반환
        true = labels.cpu().numpy()

        preds_list.extend(preds)
        true_list.extend(true)

    # 현재 에폭의 검증 완료
    print(f"\t검증 데이터 손실값: {epoch_valid_loss/len(loader_valid):.4f}")

    # 평가지표 계산(정확도, 재현율, F1 점수)
    val_accuracy = accuracy_score(true_list, preds_list)
    val_recall = recall_score(true_list, preds_list)
    val_f1_score = f1_score(true_list, preds_list)
    print(f"\t정확도: {val_accuracy:.4f} / 재현율: {val_recall:.4f} / F1 점수: {val_f1_score:.4f}")

    # == [최적 모델 가중치 찾기] ===============
    # 현 에폭에서의 손실값이 최소 손실값 이하면 모델 가중치 저장
    if epoch_valid_loss <= valid_loss_min:
      print(f"\t### 검증 데이터 손실값 감소 ({valid_loss_min:.4f} --> {epoch_valid_loss:.4f}). 모델 저장")

      # 모델 가중치를 파일로 저장
      torch.save(model.state_dict(), save_file)
      valid_loss_min = epoch_valid_loss # 최소 손실값 갱신

  return torch.load(save_file)  # 최적 모델 가중치 반환

Step 5-3. Learning ★Improvement: Ensemble

In [None]:
# 첫 번째 모델 훈련
model_state_dict = train(model=models_list[0],
                         loader_train=loader_train,
                         loader_valid=loader_valid,
                         criterion=criterion,
                         optimizer=optimizer1,
                         scheduler=scheduler1,
                         epochs=epochs)
# 첫 번째 모델에 최적 가중치 적용
models_list[0].load_state_dict(model_state_dict)

In [None]:
# 두 번째 모델 훈련
model_state_dict = train(model=models_list[1],
                         loader_train=loader_train,
                         loader_valid=loader_valid,
                         criterion=criterion,
                         optimizer=optimizer2,
                         scheduler=scheduler2,
                         epochs=epochs)
# 두 번째 모델에 최적 가중치 적용
models_list[1].load_state_dict(model_state_dict)

In [None]:
# 세 번째 모델 훈련
model_state_dict = train(model=models_list[2],
                         loader_train=loader_train,
                         loader_valid=loader_valid,
                         criterion=criterion,
                         optimizer=optimizer3,
                         scheduler=scheduler3,
                         epochs=epochs)
# 세 번째 모델에 최적 가중치 적용
models_list[2].load_state_dict(model_state_dict)

# STEP 6. Evaluation and Submission
### ★Improvement: Ensemble

Step 6-1. Setting

In [None]:
dataset_test = ImageFolder(root=test_path, transform=transform_test)

loader_test = DataLoader(dataset=dataset_test, batch_size=batch_size,
                         shuffle=False, worker_init_fn=seed_worker,
                         generator=g, num_workers=2)

Step 6-2. Predict Function

In [None]:
def predict(model, loader_test, return_true=False):
  model.eval()    # 모델을 평가 상태로 설정
  preds_list = [] # 예측값 저장용 리스트 초기화
  true_list = []  # 실제값 저장용 리스트 초기화

  with torch.no_grad(): # 기울기 계산 비활성화
    for images, labels in loader_test:
      images = images.to(device)
      labels = labels.to(device)

      outputs = model(images)

      preds = torch.max(outputs.cpu(), dim=1)[1].numpy()  # 예측값
      true = labels.cpu().numpy()                         # 실제값

      preds_list.extend(preds)
      true_list.extend(true)

    if return_true:
      return true_list, preds_list
    else:
      return preds_list

Step 7-3. Evaluation

In [None]:
preds_lists = []
true_list, preds_lists[0] = predict(model=models_list[0],
                                loader_test=loader_test,
                                return_true=True)

preds_lists[1] = predict(model=models_list[1],
                      loader_test=loader_test)

preds_lists[2] = predict(model=models_list[2],
                      loader_test=loader_test)

In [None]:
for idx, preds_list in enumerate(preds_lists):
  print("#"*5, f"모델 efficientnet-b{idx+1} 예측 결과 평가 점수", "#*5")
  print(f"정확도: {accuracy_score(true_list, preds_list):.4f}")
  print(f"재현율: {recall_score(true_list, preds_list):.4f}")
  print(f"F1 점수: {f1_score(true_list, preds_list):.4f}")

Step 7-4. ★Ensemble

In [None]:
ensemble_preds = []

for i in range(len(preds_lists[0])):
  pred_element = np.round((preds_lists[0][i] + preds_lists[1][i] + preds_lists[2][i])/3) # 예측값 더하고 3으로 나눈뒤 반올림(과반수)
  ensemble_preds.append(pred_element)

In [None]:
print("#"*5, f"앙상블 예측 결과 평가 점수", "#*5")
print(f"정확도: {accuracy_score(true_list, ensemble_preds):.4f}")
print(f"재현율: {recall_score(true_list, ensemble_preds):.4f}")
print(f"F1 점수: {f1_score(true_list, ensemble_preds):.4f}")

Step 7-5. Submission

In [None]:
# submission[['healthy', 'multiple_diseases', 'rust', 'scab']] = preds   # submission df 결과값 재설정
# submission.to_csv('submission.csv', index=False)                       # 제출 파일 생성

# # 이미지 테스트 데이터 삭제
# import shutil

# shutil.rmtree('./train')
# shutil.rmtree('./test')