<a href="https://colab.research.google.com/github/udoli3/AIFFEL_quest/blob/main/KerasQuest/PneumoniaDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Main Quest CR8

#폐렴 진단 모델

##0. 설계

아이펠 코어 과정 CV 모듈 학습의 일환으로 아래 모델을 구성해본다.

1. 데이터 살펴보기
2. 데이터 전처리
3. 모델링, 하이퍼파라미터 튜닝
4. 결과
  * 요약
  * 회고

##1. 데이터 살펴보기

###1. 라이브러리 준비

In [None]:
import os, re
import random, math
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.utils import plot_model
import warnings
warnings.filterwarnings(action='ignore')

###2. 필요한 변수들 생성(상수들)

In [None]:
# 데이터 로드할 때 빠르게 로드할 수 있도록하는 설정 변수
AUTOTUNE = tf.data.experimental.AUTOTUNE
# X-RAY 이미지 사이즈 변수
IMAGE_SIZE = [180, 180]

# 데이터 경로 변수
ROOT_PATH = os.path.join(os.getenv('HOME'), 'aiffel')
TRAIN_PATH = ROOT_PATH + '/chest_xray/data/train/*/*' # *은 모든 디렉토리와 파일을 의미합니다.
VAL_PATH = ROOT_PATH + '/chest_xray/data/val/*/*'
TEST_PATH = ROOT_PATH + '/chest_xray/data/test/*/*'

# 프로젝트를 진행할 때 아래 두 변수를 변경해보세요
BATCH_SIZE = 16
EPOCHS = 10

print(ROOT_PATH)

###3. 데이터 가져오기 및 구분하기

데이터 구성
* train
* val(validation)
* test

####3.1 확인하기 (훈련, 검증 세트 균형)

균형 확인

In [None]:
train_filenames = tf.io.gfile.glob(TRAIN_PATH)
test_filenames = tf.io.gfile.glob(TEST_PATH)
val_filenames = tf.io.gfile.glob(VAL_PATH)

print(len(train_filenames))
print(len(test_filenames))
print(len(val_filenames))

* val_filenames 가 지나치게 작다.

####3.2 분할하기 (훈련, 검증 세트 구분)

∴ train : val를 80:20으로 분할하기

In [None]:
# train 데이터와 validation 데이터를 모두 filenames에 담습니다
filenames = tf.io.gfile.glob(TRAIN_PATH)
filenames.extend(tf.io.gfile.glob(VAL_PATH))

# 모아진 filenames를 8:2로 나눕니다
train_size = math.floor(len(filenames)*0.8)
# 실험결과 재현 용이하도록 시드 먼저 주고 섞기(shuffle)
random.seed(8)
random.shuffle(filenames)
# 순서 정해 섞었으면 자르기
train_filenames = filenames[:train_size]
val_filenames = filenames[train_size:]

확인

In [None]:
print(len(train_filenames))
print(len(val_filenames))

* 훈련세트 이미지 수:
* 검증세트 이미지 수:

####3.3 확인하기 (라벨 균형)

데이터 얼마나 불균형인가?

각 이미지 수 확인하기 (경로 이름에 명시되어 있는 사실 활용하기)

In [None]:
print(f'Normal image path\n{filenames[0]}')
print(f'Pneumonia image path\n{filenames[2000]}')

NameError: name 'filenames' is not defined

In [None]:
COUNT_NORMAL = len([filename for filename in train_filenames if "NORMAL" in filename])
print(f"Normal images count in training set: {COUNT_NORMAL}")


COUNT_PNEUMONIA = len([filename for filename in train_filenames if "PNEUMONIA" in filename])
print(f"Pneumonia images count in training set: {COUNT_PNEUMONIA}")

* 폐렴 음성 이미지 수: 1071
* 폐럼 양성 이미지 수: 3114

#####mini-batch 활용 사전 준비

######tf.data 인스턴스 만들기

In [None]:
train_list_ds = tf.data.Dataset.from_tensor_slices(train_filenames)
val_list_ds = tf.data.Dataset.from_tensor_slices(val_filenames)

# for f in train_list_ds.take(5):
    # print(f.numpy())

######tf.data 인스턴스를 활용한 훈련세트, 검증세트 각각 이미지 개수 확인

In [None]:
TRAIN_IMG_COUNT = tf.data.experimental.cardinality(train_list_ds).numpy()
print(f"Training images count: {TRAIN_IMG_COUNT}")

VAL_IMG_COUNT = tf.data.experimental.cardinality(val_list_ds).numpy()
print(f"Validating images count: {VAL_IMG_COUNT}")

##2. 데이터 전처리

###1. 라벨링

* 이미지 라벨 생성하기
* 이미지 인덱스와 라벨 매칭시켜주기 (+이미지 크기 조정)

####라벨 데이터 만들어주기

라벨 데이터 만들어주는 함수 생성
* 파일 경로의 끝에서 두번째 부분을 확인하면 양성과 음성을 구분 가능했다.

In [None]:
def get_label(file_path):
    # path 를 구성 요소 단위로 리스트화
    parts = tf.strings.split(file_path, os.path.sep)
    # 끝에서 두 번째 부분 (즉 [-2]인덱스) == 경로에서 클래스를 나타내는 부분
    return parts[-2] == "PNEUMONIA"   # 폐렴이면 양성(True), 노말이면 음성(False)

###2. 이미지 사이즈 조정하기

* 원본 이미지는 0 ~ 255 값을 가지므로 (모델에 주입 가능하도록) 0 ~ 1 사이의 정수값을 가지도록 스케일링

함수로 처리

In [None]:
def decode_img(img):
    img = tf.image.decode_jpeg(img, channels=3) # 이미지를 uint8 tensor로 수정
    img = tf.image.convert_image_dtype(img, tf.float32) # float32 타입으로 수정
    img = tf.image.resize(img, IMAGE_SIZE) # 이미지 사이즈를 IMAGE_SIZE로 수정
    return img

###3. 이미지, 라벨 확인 (경로 입력하여 확인)

* 이미지 파일의 경로 입력 --> 이미지와 라벨 읽어오기


함수로 처리

In [None]:
def process_path(file_path):
    label = get_label(file_path) # 라벨 검출
    img = tf.io.read_file(file_path) # 이미지 읽기
    img = decode_img(img) # 이미지를 알맞은 형식으로 수정
    return img, label

####병렬 처리 적용 (훈련, 검증 세트)
* map 함수로 데이터 처리 효율 개선

In [None]:
# process_path 함수를 train_list_ds 훈련세트 데이터셋의 각 요소에 적용하되, 병렬 처리하며 스레드 개수는 최적으로 자동 결정
train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
# val_list_ds 검증세트에도 적용
val_ds = val_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)

처리 확인해보자

In [None]:
for image, label in train_ds.take(1):
    print("Image shape: ", image.numpy().shape)
    print("Label: ", label.numpy())

####병렬 처리 적용 (테스트세트)



In [None]:
# 테스트세트도 tf.data 인스턴스 만들기 (mini-batch 활용 준비)
test_list_ds = tf.data.Dataset.list_files(TEST_PATH)
# tf.data 인스턴스를 활용한 테스트세트 이미지 개수 확인
TEST_IMAGE_COUNT = tf.data.experimental.cardinality(test_list_ds).numpy()
# 병렬처리 적용
test_ds = test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE)

print(TEST_IMAGE_COUNT)

###4. 추가 처리 (데이터 셔플링, 반복하여 증강, 배치사이즈 조정, 데이터 사전 로딩)

* 편향된 학습 방지 (무작위로 섞어서 데이터가 균일하게 사용되도록)
* 데이터셋 효과적으로 늘리기 (복사하지 않고 반복 사용)
* 배치 사이즈 조정해서 데이터 효율적으로 처리하기
  * 너무 크면 오히려 성능 저하
* 데이터 미리 로드해서 학습 속도 올리기
  * 데이터셋 특성에 따라 적용해야함

함수로 처리

In [None]:
def prepare_for_training(ds, shuffle_buffer_size=1000):
    ds = ds.shuffle(buffer_size=shuffle_buffer_size)
    ds = ds.repeat()
    ds = ds.batch(BATCH_SIZE)
    ds = ds.prefetch(buffer_size=AUTOTUNE)
    return ds

train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)

###5. 데이터 시각화

* 이미지 배치 입력 --> 여러장의 이미지 출력하기

In [None]:
def show_batch(image_batch, label_batch):
    plt.figure(figsize=(10,10))
    for n in range(BATCH_SIZE):
        ax = plt.subplot(4,math.ceil(BATCH_SIZE/4),n+1)
        plt.imshow(image_batch[n])
        if label_batch[n]:
            plt.title("PNEUMONIA")
        else:
            plt.title("NORMAL")
        plt.axis("off")

In [None]:
image_batch, label_batch = next(iter(train_ds))

In [None]:
show_batch(image_batch.numpy(), label_batch.numpy())

##3. 모델링, 하이퍼파라미터 튜닝

###1. CNN 모델링

레이어 구성
* 함수 처리로 모듈화

Convolution 레이어

In [None]:
def conv_block(filters):
    block = tf.keras.Sequential([
        tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
        tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D()
    ])

    return block

Dense 레이어

In [None]:
def dense_block(units, dropout_rate):
    block = tf.keras.Sequential([
        tf.keras.layers.Dense(units, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(dropout_rate)
    ])

    return block

* 두 가지 일반화성능 향상 기법 적용
  * Batch Normalization : 배치 단위별 데이터 분포 정규화
  * DropOut

(동시에 사용하기로 함)

모델 전체구성 역시 함수 처리

In [None]:
def build_model():
    model = tf.keras.Sequential([
        tf.keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),

        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.MaxPool2D(),

        conv_block(32),
        conv_block(64),

        conv_block(128),
        tf.keras.layers.Dropout(0.2),

        conv_block(256),
        tf.keras.layers.Dropout(0.2),

        tf.keras.layers.Flatten(),
        dense_block(512, 0.7),
        dense_block(128, 0.5),
        dense_block(64, 0.3),

        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    return model

###2. 데이터 불균형imbalance 처리

Weight balancing 기법을 사용한 불균형 처리

(설명 출처 : 아이펠)
* weight balancing: training set의 각 데이터에서 loss를 계산할 때 특정 클래스의 데이터에 더 큰 loss 값을 갖도록 가중치를 부여
* Keras는 model.fit()을 호출할 때 파라미터로 넘기는 class_weight 에 이러한 클래스별 가중치를 세팅할 수 있도록 지원함

In [None]:
weight_for_0 = (1 / COUNT_NORMAL)*(TRAIN_IMG_COUNT)/2.0
weight_for_1 = (1 / COUNT_PNEUMONIA)*(TRAIN_IMG_COUNT)/2.0

class_weight = {0: weight_for_0, 1: weight_for_1}

print('Weight for NORMAL: {:.2f}'.format(weight_for_0))
print('Weight for PNEUMONIA: {:.2f}'.format(weight_for_1))

* 음성 이미지가 훨씬 적으므로 음성 이미지에 더 큰 가중치 부여

###3. 모델 훈련

메트릭:
* 정확도 (불균형이 있으므로 precision 및 recall로 더 정밀히 확인하기)
* 이진분류이므로 binary_crossentropy
* binary_crossentropy 비선형 손실함수를 사용하며 데이터가 불균형하므로, adam 옵티마이저 채택

In [None]:
with tf.device('/GPU:0'):
    model = build_model()

    METRICS = [
        'accuracy',
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall')
    ]

    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=METRICS
    )

NameError: name 'tf' is not defined

In [None]:
with tf.device('/GPU:0'):
    history = model.fit(
        train_ds,
        steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=val_ds,
        validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
        class_weight=class_weight,
    )

NameError: name 'tf' is not defined

###4. 훈련 결과 확인

Epochs 마다 모델의
* precision
* recall
* accuracy
* loss

가 어떻게 변했을까?

In [None]:
fig, ax = plt.subplots(1, 4, figsize=(20, 3))
ax = ax.ravel()

for i, met in enumerate(['precision', 'recall', 'accuracy', 'loss']):
    ax[i].plot(history.history[met])
    ax[i].plot(history.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

모델 평가

위와 동일한 내용을 출력한다.
* loss
* accuracy
* precision
* recall

In [None]:
loss, accuracy, precision, recall = model.evaluate(test_ds)
print(f'Loss: {loss},\nAccuracy: {accuracy},\nPrecision: {precision},\nRecall: {recall}')

##4. 성능 개선

Residual Network 구성

In [None]:
# class ResNetBlock(Layer):
#   def __init__(self, filters, strides=1):
#     super(self).__init__()

#     first_padding = 'same'
#     if strides != 1:
#       first_padding = 'valid'

#   self.conv_sequence = Sequential([
#       Conv2D(filters, 3, first_stride, padding=first_padding),
#       BatchNormalization(),
#       Activation('relu'),

#       Conv2D(filters, 3, 1, padding='same'),
#       BatchNormalization(),
#       Activation('relu')
#   ])

#   def call(self, inputs):
#     x = self.conv_sequence(inputs)

#     # skip connection
#     if x.shape == inputs.shape:
#       x += inputs

#     return x

In [None]:
# class ResNet(Model):
#   def __init__(self):
#     super(ResNet, self).__init__()

#     self.conv_1 = tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same')

#     self.resnet_chains = Sequential([ResNetBlock(64), ResNetBlock(64)] +
#                                      [ResNetBlock(128, 2), ResNetBlock(128)] +
#                                      [ResNetBlock(256, 2), ResNetBlock(256)] +
#                                      [ResNetBlock(512, 2), ResNetBlock(512)])

#     self.out = Sequential(GlobalAveragePooling2D(), Dense(1, activation='sigmoid'))

#   def call(self, x):
#     x = self.conv_1(x)
#     x = self.resnet_chains(x)
#     x = self.out(x)
#     return x

In [None]:
model = ResNet()

In [None]:
model.build(input_shape=(None, 224, 224, 3))

In [None]:
plot_model(model, show_shapes=True)

학습률 재설정

####Data Augmentation

GAN을 이용한 Data Augmentation

좌우반전 적용

In [None]:
def augment(image,label):
    image = tf.image.random_flip_left_right(image)  # 랜덤하게 좌우를 반전합니다.
    return image,label

def prepare_for_training(ds, shuffle_buffer_size=1000):
    # augment 적용 부분이 배치처리 함수에 추가되었습니다.
    ds = ds.map(
            augment,       # augment 함수 적용
            num_parallel_calls=2
        )
    ds = ds.shuffle(buffer_size=shuffle_buffer_size)
    ds = ds.repeat()
    ds = ds.batch(BATCH_SIZE)
    ds = ds.prefetch(buffer_size=AUTOTUNE)

    return ds

train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)

###훈련 결과 확인 (재)

In [None]:
fig, ax = plt.subplots(1, 4, figsize=(20, 3))
ax = ax.ravel()

for i, met in enumerate(['precision', 'recall', 'accuracy', 'loss']):
    ax[i].plot(history.history[met])
    ax[i].plot(history.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])

##5. 예측 및 평가

In [None]:
loss, acc, prec, rec = model.evaluate(test_ds)

회고

시간이 충분히 주어졌는데도 그 시간을 충분히 사용하지 못했다. 팀프로젝트에 차질 없도록 반드시 따라잡겠다.