# 사전작업

In [1]:
from google.colab import drive
drive.mount('https://drive.google.com/drive/folders/1v18ESRUa2LzehS6LmPLpkx7jCWZ2T36o?usp=drive_link')

ValueError: Mountpoint must be in a directory that exists

In [None]:
%cd '/content/drive/MyDrive'

/content/drive/MyDrive


# DenseNet121 모델을 사용하여 학습

## 환경설정 및 라이브러리 임포트

In [None]:
pip install tensorflow scikit-learn matplotlib



In [None]:
# 필수 라이브러리 임포트 및 재현성 설정
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.applications.densenet import preprocess_input
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
import matplotlib.pyplot as plt

# 재현성(가능한 범위) 설정
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# 경로/하이퍼파라미터 설정
DATA_DIR = "/content/drive/MyDrive/image/"
IMG_SIZE = (224, 224)
BATCH_SIZE = 16

# 전체 데이터셋을 훈련(80%), 검증(10%), 테스트(10%)로 분할
VAL_SPLIT = 0.1
TEST_SPLIT = 0.1

EPOCHS = 30
LEARNING_RATE = 1e-4
MODEL_DIR = "checkpoints"
os.makedirs(MODEL_DIR, exist_ok=True)
BEST_MODEL_PATH = os.path.join(MODEL_DIR, "best_densenet121.keras")

## 데이터 로딩 및 분할

In [None]:
# 모든 데이터셋을 한 번에 로드
all_ds = tf.keras.utils.image_dataset_from_directory(
    DATA_DIR,
    labels="inferred",
    label_mode="binary",
    color_mode="rgb",
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=True,
    seed=SEED,
)

# 데이터셋의 전체 배치를 계산
ds_size = tf.data.experimental.cardinality(all_ds).numpy()
train_size = int(0.8 * ds_size)
val_size = int(0.1 * ds_size)
# 테스트 세트 크기는 나머지로 설정
test_size = ds_size - train_size - val_size

# take()와 skip()을 사용하여 훈련, 검증, 테스트 세트로 분할
train_ds = all_ds.take(train_size)
val_ds = all_ds.skip(train_size).take(val_size)
test_ds = all_ds.skip(train_size).skip(val_size)

# 클래스 이름 확인
class_names = all_ds.class_names
print("Class names (label order):", class_names)
print(f"Number of training batches: {tf.data.experimental.cardinality(train_ds).numpy()}")
print(f"Number of validation batches: {tf.data.experimental.cardinality(val_ds).numpy()}")
print(f"Number of test batches: {tf.data.experimental.cardinality(test_ds).numpy()}")

# 성능 최적화를 위한 캐싱/프리페치
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)
test_ds = test_ds.cache().prefetch(AUTOTUNE)

## 모델 아키텍처 구성

In [None]:
# 데이터 증강 + 전처리 파이프라인
data_augmentation = keras.Sequential(
    [
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.05),
        layers.RandomZoom(0.05),
        layers.RandomContrast(0.05),
    ],
    name="data_augmentation"
)

preprocess_layer = layers.Lambda(preprocess_input, name="densenet_preprocess")

# 모델 구성
base_model = DenseNet121(
    include_top=False,
    weights="imagenet",
    input_shape=IMG_SIZE + (3,)
)
base_model.trainable = False

inputs = keras.Input(shape=IMG_SIZE + (3,), name="input_image")
x = data_augmentation(inputs)
x = preprocess_layer(x)
x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs, outputs, name="DenseNet121_COVID_Classifier")

# 컴파일 및 클래스 불균형 대응
loss_fn = keras.losses.BinaryCrossentropy()
optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)
metrics = [
    keras.metrics.BinaryAccuracy(name="acc"),
    keras.metrics.AUC(name="auc"),
    keras.metrics.Precision(name="precision"),
    keras.metrics.Recall(name="recall"),
]
model.compile(optimizer=optimizer, loss=loss_fn, metrics=metrics)
model.summary()

# 클래스별 샘플 수 계산 및 가중치 부여
def count_by_class(tf_dataset):
    counts = {0:0, 1:0}
    for xb, yb in tf_dataset.unbatch():
        label = int(yb.numpy())
        counts[label] += 1
    return counts

train_counts = count_by_class(train_ds)
print("Train counts:", train_counts)
total = train_counts[0] + train_counts[1]
class_weight = {
    0: total / (2.0 * train_counts[0]) if train_counts[0] > 0 else 1.0,
    1: total / (2.0 * train_counts[1]) if train_counts[1] > 0 else 1.0,
}
print("Class weight:", class_weight)

## 1단계 학습

In [None]:
# 콜백 설정
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor="val_auc",
        patience=5,
        mode="max",
        restore_best_weights=True
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_auc",
        factor=0.5,
        patience=2,
        mode="max",
        min_lr=1e-6,
        verbose=0
    ),
    keras.callbacks.ModelCheckpoint(
        BEST_MODEL_PATH,
        monitor="val_auc",
        mode="max",
        save_best_only=True,
        verbose=1
    )
]

# 1단계 학습
history_1 = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    class_weight=class_weight,
    callbacks=callbacks,
    verbose=1
)

## 2단계 학습

In [None]:
# 2단계 파인튜닝
base_model.trainable = True
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE * 0.1),
    loss=loss_fn,
    metrics=metrics
)

history_2 = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    class_weight=class_weight,
    callbacks=callbacks,
    verbose=1
)

## 최종 평가 및 시각화

In [None]:
# 최고의 가중치 로드
model = keras.models.load_model(
    BEST_MODEL_PATH,
    custom_objects={'preprocess_input': preprocess_input}
)

# 테스트 세트 예측 및 평가
print("\n" + "="*50)
print("Evaluating on the Test Dataset")
print("="*50 + "\n")

y_true = []
y_prob = []

for xb, yb in test_ds:
    prob = model.predict(xb, verbose=0).ravel()
    y_prob.append(prob)
    y_true.append(yb.numpy().ravel())

y_true = np.concatenate(y_true).astype(int)
y_prob = np.concatenate(y_prob)
y_pred = (y_prob >= 0.5).astype(int)

# 최종 평가 지표 출력
cm = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:\n", cm)

plt.figure(figsize=(5,4))
plt.imshow(cm, interpolation='nearest')
plt.title("Confusion Matrix (Test Set)")
plt.xlabel("Predicted label")
plt.ylabel("True label")
plt.colorbar()
tick_marks = np.arange(2)
plt.xticks(tick_marks, class_names, rotation=45)
plt.yticks(tick_marks, class_names)
thresh = cm.max() / 2.
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, format(cm[i, j], 'd'),
                 ha="center", va="center",
                 color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.show()

print("\nClassification Report (Test Set)")
print(classification_report(y_true, y_pred, target_names=class_names, digits=4))

fpr, tpr, thresholds = roc_curve(y_true, y_prob)
roc_auc = auc(fpr, tpr)
print(f"ROC AUC (Test Set): {roc_auc:.4f}")

plt.figure(figsize=(5,4))
plt.plot(fpr, tpr, label=f"ROC curve (AUC = {roc_auc:.4f})")
plt.plot([0,1], [0,1], linestyle="--")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver Operating Characteristic (Test Set)")
plt.legend(loc="lower right")
plt.tight_layout()
plt.show()

# 3. streamlit

In [None]:
pip install streamlit

Collecting streamlit
  Downloading streamlit-1.49.0-py3-none-any.whl.metadata (9.5 kB)
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.49.0-py3-none-any.whl (10.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.0/10.0 MB[0m [31m72.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m92.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pydeck, streamlit
Successfully installed pydeck-0.9.1 streamlit-1.49.0


In [None]:
import streamlit as st
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications.densenet import preprocess_input
import numpy as np
from PIL import Image
import cv2

# 설정 및 모델 로드

MODEL_PATH = "checkpoints/best_densenet121.keras"
LAST_CONV_LAYER_NAME = "relu"

st.set_page_config(page_title="COVID-19 X-ray Classifier", layout="wide")

# 모델 로딩
@st.cache_resource
def load_keras_model():
    try:
        model = keras.models.load_model(
            MODEL_PATH,
            custom_objects={'preprocess_input': preprocess_input}
        )
        return model
    except Exception as e:
        st.error(f"모델 로딩 중 오류가 발생했습니다: {e}")
        st.error(f"'{MODEL_PATH}' 경로에 모델 파일이 있는지 확인해주세요.")
        return None

model = load_keras_model()

# Grad-CAM 함수 구현

def make_gradcam_heatmap(img_array, model, last_conv_layer_name):
    if model is None:
        return None

    base_model = model.get_layer('densenet121')
    last_conv_layer = base_model.get_layer(last_conv_layer_name)

    last_conv_layer_model = keras.Model(base_model.inputs, last_conv_layer.output)

    # 전체 모델의 최종 예측 출력을 얻기 위한 모델을 정의
    classifier_input = keras.Input(shape=last_conv_layer.output.shape[1:])
    x = classifier_input

    for layer in model.layers:
        if layer.name not in ['input_image', 'data_augmentation', 'densenet_preprocess', 'densenet121']:
            x = layer(x)
    classifier_model = keras.Model(classifier_input, x)

    # GradientTape를 사용하여 그래디언트를 계산
    with tf.GradientTape() as tape:

        last_conv_layer_output = last_conv_layer_model(img_array)
        tape.watch(last_conv_layer_output)

        preds = classifier_model(last_conv_layer_output)
        class_output = preds[0]

    grads = tape.gradient(class_output, last_conv_layer_output)

    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / (tf.math.reduce_max(heatmap) + 1e-8)
    return heatmap.numpy()


def superimpose_gradcam(img, heatmap, alpha=0.4):
    if heatmap is None:
        return img

    # 히트맵을 원본 이미지와 같은 크기로 확대
    heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))

    # 히트맵을 0-255 범위의 8비트 이미지로 변환
    heatmap = np.uint8(255 * heatmap)

    # 'jet' 컬러맵 적용
    jet = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    jet = cv2.cvtColor(jet, cv2.COLOR_BGR2RGB)

    # 원본 이미지와 히트맵을 겹침
    superimposed_img = jet * alpha + img * (1 - alpha)
    superimposed_img = np.clip(superimposed_img, 0, 255).astype(np.uint8)

    return superimposed_img

# Streamlit UI 구성

st.title("흉부 X-ray COVID-19 분류 및 Grad-CAM 시각화 pneumonia-detection-using-x-ray-images")
st.write("DenseNet121 기반의 딥러닝 모델을 사용하여 COVID-19 감염 여부를 예측하고, AI가 어느 부분을 보고 판단했는지 히트맵으로 보여줍니다.")

if model is not None:
    uploaded_file = st.file_uploader("흉부 X-ray 이미지를 업로드하세요 (JPG, PNG)", type=["jpg", "png", "jpeg"])

    if uploaded_file is not None:
        pil_img = Image.open(uploaded_file).convert('RGB')
        img_array = np.array(pil_img.resize((224, 224)))

        processed_img_for_pred = np.expand_dims(img_array.copy(), axis=0)

        processed_img_for_gradcam = preprocess_input(processed_img_for_pred.copy())


        if st.button("예측 실행"):
            with st.spinner('모델이 이미지를 분석 중입니다...'):
                prediction = model.predict(processed_img_for_pred)[0][0]

                if prediction >= 0.5:
                    class_label = "Negative"
                    probability = prediction * 100
                else:
                    class_label = "Positive"
                    probability = (1 - prediction) * 100

                result_text = f"**{class_label}**일 확률이 **{probability:.2f}%** 입니다."
                # ---------------------------------------------------------------------

                # Grad-CAM 생성
                heatmap = make_gradcam_heatmap(processed_img_for_gradcam, model, LAST_CONV_LAYER_NAME)

                # 원본 이미지 위에 히트맵 겹치기
                superimposed_img = superimpose_gradcam(img_array, heatmap)

                # 결과 출력
                st.subheader("분석 결과")
                if class_label == "Positive":
                    st.error(result_text)
                else:
                    st.success(result_text)

                col1, col2 = st.columns(2)
                with col1:
                    st.image(pil_img, caption="원본 이미지", use_column_width=True)
                with col2:
                    st.image(superimposed_img, caption="Grad-CAM 분석 결과", use_column_width=True)

                st.info("""
                **Grad-CAM 해석:**
                - **붉은색 영역**은 모델이 'Positive'라고 판단하는 데 가장 큰 영향을 미친 부분입니다.
                - **푸른색 영역**은 판단에 거의 영향을 미치지 않은 부분입니다.
                - 이 시각화는 모델의 판단을 해석하는 데 도움을 주지만, 100% 정확한 의학적 진단을 의미하지는 않습니다.
                """)
else:
    st.warning("모델을 불러올 수 없습니다. 관리자에게 문의하세요.")

Writing app.py
