In [None]:
!pip install kaggle



In [None]:
! pip install -q git+https://github.com/keras-team/keras-cv

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m950.8/950.8 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for keras-cv (pyproject.toml) ... [?25l[?25hdone


In [None]:
# mount the drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
# You can use `tensorflow`, `pytorch`, `jax` here
# KerasCore makes the notebook backend agnostic :)
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras_cv
import keras_core as keras
from keras_core import layers

import numpy as np
import pandas as pd
import tensorflow as tf
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split

Config

In [None]:
class Config:
    SEED = 42
    IMAGE_SIZE = [256, 256]
    BATCH_SIZE = 16
    EPOCHS = 15
    TARGET_COLS = [
        "bowel"
    ]
    AUTOTUNE = tf.data.AUTOTUNE

config = Config()

In [None]:
keras.utils.set_random_seed(seed=config.SEED)

Dataset

In [None]:
BASE_PATH = "/content/drive/MyDrive/rsna_data"

In [None]:
# CSV 파일을 읽어와서 데이터프레임 생성
train_df = pd.read_csv(f"{BASE_PATH}/train.csv")
series_meta_df = pd.read_csv(f"{BASE_PATH}/train_series_meta.csv")

# train.csv와 train_series_meta.csv를 patient_id를 기준으로 병합
dataframe = pd.merge(train_df, series_meta_df, on="patient_id")

# 이미지 경로 생성
dataframe["image_path"] = f"/content/drive/MyDrive/png_jjw"\
                    + "/" + dataframe.patient_id.astype(str)\
                    + "/" + dataframe.series_id.astype(str)\
                    + "/" + "img_256x256_d1_original"\

# 처음 2개 행 출력
dataframe.head()

In [None]:
#dataframe = dataframe[dataframe['any_injury'] == 1]
print(dataframe.shape)

In [None]:
def bowel_assign_value(row):
    if row['bowel_healthy'] == 1:
        return 0
    else:
        return 1

def extra_assign_value(row):
    if row['extravasation_healthy'] == 1:
        return 0
    else:
        return 1

def kid_assign_value(row):
    if row['kidney_healthy'] == 1:
        return 0
    elif row['kidney_low'] == 1:
        return 1
    elif row['kidney_high'] == 1:
        return 2
    else:
        return None
def liver_assign_value(row):
    if row['liver_healthy'] == 1:
        return 0
    elif row['liver_low'] == 1:
        return 1
    elif row['liver_high'] == 1:
        return 2
    else:
        return None

def spleen_assign_value(row):
    if row['spleen_healthy'] == 1:
        return 0
    elif row['spleen_low'] == 1:
        return 1
    elif row['spleen_high'] == 1:
        return 2
    else:
        return None

dataframe['bowel'] = dataframe.apply(bowel_assign_value, axis=1)
dataframe['extravasation'] = dataframe.apply(extra_assign_value, axis=1)
dataframe['kidney'] = dataframe.apply(kid_assign_value, axis=1)
dataframe['liver'] = dataframe.apply(liver_assign_value, axis=1)
dataframe['spleen'] = dataframe.apply(spleen_assign_value, axis=1)

In [None]:
dataframe[["patient_id", "bowel", "extravasation", "kidney", "liver", "spleen", "image_path"]]

In [None]:
negative = dataframe[dataframe['bowel'] == 0]
positive = dataframe[dataframe['bowel'] == 1]
num_samples = min(len(negative), len(positive))
negative_samples = negative.sample(n=num_samples, random_state=42)
positive_samples = positive.sample(n=num_samples, random_state=42)
bowel_dataframe = pd.concat([negative_samples, positive_samples], axis=0)

In [None]:
negative = dataframe[dataframe['extravasation'] == 0]
positive = dataframe[dataframe['extravasation'] == 1]
num_samples = min(len(negative), len(positive))
negative_samples = negative.sample(n=num_samples, random_state=42)
positive_samples = positive.sample(n=num_samples, random_state=42)
extra_dataframe = pd.concat([negative_samples, positive_samples], axis=0)

In [None]:
negative = dataframe[dataframe['kidney'] == 0]
positive1 = dataframe[dataframe['kidney'] == 1]
positive2 = dataframe[dataframe['kidney'] == 2]
num_samples = min(len(negative), len(positive1), len(positive2))
negative_samples = negative.sample(n=num_samples, random_state=42)
positive1_samples = positive1.sample(n=num_samples, random_state=42)
positive2_samples = positive2.sample(n=num_samples, random_state=42)
kidney_dataframe = pd.concat([negative_samples, positive1_samples, positive2_samples], axis=0)

In [None]:
negative = dataframe[dataframe['spleen'] == 0]
positive1 = dataframe[dataframe['spleen'] == 1]
positive2 = dataframe[dataframe['spleen'] == 2]
num_samples = min(len(negative), len(positive1), len(positive2))
negative_samples = negative.sample(n=num_samples, random_state=42)
positive1_samples = positive1.sample(n=num_samples, random_state=42)
positive2_samples = positive2.sample(n=num_samples, random_state=42)
spleen_dataframe = pd.concat([negative_samples, positive1_samples, positive2_samples], axis=0)

In [None]:
spleen_dataframe.shape

In [None]:
# Function to handle the split for each group
def split_group(group, test_size=0.2):
    if len(group) == 1:
        return (group, pd.DataFrame()) if np.random.rand() < test_size else (pd.DataFrame(), group)
    else:
        return train_test_split(group, stratify=group["bowel"], test_size=test_size, random_state=42)

# Initialize the train and validation datasets
bowel_train_data = pd.DataFrame()
bowel_val_data = pd.DataFrame()
extra_train_data = pd.DataFrame()
extra_val_data = pd.DataFrame()
liver_train_data = pd.DataFrame()
liver_val_data = pd.DataFrame()
kidney_train_data = pd.DataFrame()
kidney_val_data = pd.DataFrame()
spleen_train_data = pd.DataFrame()
spleen_val_data = pd.DataFrame()

# Iterate through the groups and split them, handling single-sample groups
for _, group in bowel_dataframe.groupby(config.TARGET_COLS):
    bowel_train_group, bowel_val_group = split_group(group)
    bowel_train_data = pd.concat([bowel_train_data, bowel_train_group], ignore_index=True)
    bowel_val_data = pd.concat([bowel_val_data, bowel_val_group], ignore_index=True)

for _, group in extra_dataframe.groupby(config.TARGET_COLS):
    extra_train_group, extra_val_group = split_group(group)
    extra_train_data = pd.concat([extra_train_data, extra_train_group], ignore_index=True)
    extra_val_data = pd.concat([extra_val_data, extra_val_group], ignore_index=True)

for _, group in liver_dataframe.groupby(config.TARGET_COLS):
    liver_train_group, liver_val_group = split_group(group)
    liver_train_data = pd.concat([liver_train_data, liver_train_group], ignore_index=True)
    liver_val_data = pd.concat([liver_val_data, liver_val_group], ignore_index=True)

for _, group in kidney_dataframe.groupby(config.TARGET_COLS):
    kidney_train_group, kidney_val_group = split_group(group)
    kidney_train_data = pd.concat([kidney_train_data, kidney_train_group], ignore_index=True)
    kidney_val_data = pd.concat([kidney_val_data, kidney_val_group], ignore_index=True)

for _, group in spleen_dataframe.groupby(config.TARGET_COLS):
    spleen_train_group, spleen_val_group = split_group(group)
    spleen_train_data = pd.concat([spleen_train_data, spleen_train_group], ignore_index=True)
    spleen_val_data = pd.concat([spleen_val_data, spleen_val_group], ignore_index=True)

In [None]:
bowel_train_data.shape, bowel_val_data.shape

In [None]:
# print(train_data['bowel_injury'].value_counts())
# print(train_data['extravasation_injury'].value_counts())
# print(train_data['liver_high'].value_counts())
# print(train_data['liver_low'].value_counts())
# print(train_data['kidney_high'].value_counts())
# print(train_data['kidney_low'].value_counts())
# print(train_data['spleen_high'].value_counts())
# print(train_data['spleen_low'].value_counts())

In [None]:
# print(val_data['bowel_injury'].value_counts())
# print(val_data['extravasation_injury'].value_counts())
# print(val_data['liver_high'].value_counts())
# print(val_data['liver_low'].value_counts())
# print(val_data['kidney_high'].value_counts())
# print(val_data['kidney_low'].value_counts())
# print(val_data['spleen_high'].value_counts())
# print(val_data['spleen_low'].value_counts())

In [None]:
def decode_image_and_label(image_path, label):
    file_bytes1 = tf.io.read_file(image_path+'/image_001.png')
    image1 = tf.io.decode_png(file_bytes1, channels=1, dtype=tf.uint8)
    file_bytes2 = tf.io.read_file(image_path+'/image_002.png')
    image2 = tf.io.decode_png(file_bytes2, channels=1, dtype=tf.uint8)
    file_bytes3 = tf.io.read_file(image_path+'/image_003.png')
    image3 = tf.io.decode_png(file_bytes3, channels=1, dtype=tf.uint8)
    image = tf.concat([image1, image2, image3], axis=2)

    image = tf.image.resize(image, config.IMAGE_SIZE, method="bilinear")
    image = tf.cast(image, tf.float32) / 255.0

    label = tf.cast(label, tf.float32)

    return (image, label)

In [None]:
# 레이어 외부에서 RandomFlip 레이어를 생성
random_flip_layer = tf.keras.layers.experimental.preprocessing.RandomFlip("horizontal")
random_rotation_layer = tf.keras.layers.experimental.preprocessing.RandomRotation(0.2)

class CustomAugmenter(tf.keras.layers.Layer):
    def __init__(self, cutout_params, **kwargs):
        super(CustomAugmenter, self).__init__(**kwargs)
        self.cutout_layer = keras_cv.layers.Augmenter([keras_cv.layers.RandomCutout(**cutout_params)])

    def call(self, inputs, training=None):
        if training:
            inputs = random_flip_layer(inputs)
            inputs = random_rotation_layer(inputs)
            inputs = self.cutout_layer(inputs)
        return inputs

def apply_augmentation(images, labels):
    # 이미지 증강 파이프라인을 정의
    augmenter = CustomAugmenter(cutout_params={"height_factor": 0.2, "width_factor": 0.2})

    # 이미지 증강을 적용
    augmented_images = augmenter(images, training=True)

    return (augmented_images, labels)

In [None]:
def build_dataset(image_paths, labels):
    ds = (
        tf.data.Dataset.from_tensor_slices((image_paths, labels))
        .map(decode_image_and_label, num_parallel_calls=config.AUTOTUNE)
        .shuffle(config.BATCH_SIZE * 10)
        .batch(config.BATCH_SIZE)
        .map(apply_augmentation, num_parallel_calls=config.AUTOTUNE)  # 이미지 증강 적용
        .prefetch(config.AUTOTUNE)
    )
    return ds

In [None]:
paths = bowel_train_data.image_path.tolist()
labels = bowel_train_data[config.TARGET_COLS].values
print(len(paths))
print(labels.shape)

In [None]:
paths = bowel_train_data.image_path.tolist()
labels = bowel_train_data[config.TARGET_COLS].values

ds = build_dataset(image_paths=paths, labels=labels)
images, labels = next(iter(ds))
images.shape, [label.shape for label in labels]

In [None]:
keras_cv.visualization.plot_image_gallery(
    images=images,
    value_range=(0, 1),
    rows=2,
    cols=2,
)

Model

In [None]:
import tensorflow as tf
from sklearn.metrics import confusion_matrix

# Custom metric to calculate sensitivity
def sensitivity(y_true, y_pred):
    true_positives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 1), tf.equal(tf.round(y_pred), 1)), dtype=tf.float32))
    actual_positives = tf.reduce_sum(tf.cast(tf.equal(y_true, 1), dtype=tf.float32))
    return true_positives / (actual_positives + tf.keras.backend.epsilon())

# Custom metric to calculate specificity
def specificity(y_true, y_pred):
    true_negatives = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true, 0), tf.equal(tf.round(y_pred), 0)), dtype=tf.float32))
    actual_negatives = tf.reduce_sum(tf.cast(tf.equal(y_true, 0), dtype=tf.float32))
    return true_negatives / (actual_negatives + tf.keras.backend.epsilon())

Efficientnet

In [None]:
def build_binary_classification_model(warmup_steps, decay_steps, head_name):
    # Define Input
    inputs = keras.Input(shape=config.IMAGE_SIZE + [3,], batch_size=config.BATCH_SIZE)

    # Define Backbone
    backbone = keras_cv.models.EfficientNetV2Backbone.from_preset("efficientnetv2_b3")
    backbone.include_rescaling = False
    x = backbone(inputs)

    # GAP to get the activation maps
    gap = keras.layers.GlobalAveragePooling2D()
    x = gap(x)

    # Define 'necks' for the binary classification head
    x_head = keras.layers.Dense(32, activation='silu')(x)

    # Define binary classification head
    output = keras.layers.Dense(1, name=head_name, activation='sigmoid')(x_head)

    # Create model
    print(f"[INFO] Building the {head_name} model...")
    model = keras.Model(inputs=inputs, outputs=output)

    # Cosine Decay
    cosine_decay = keras.optimizers.schedules.CosineDecay(
        initial_learning_rate=1e-4,
        decay_steps=decay_steps,
        alpha=0.0,
        warmup_target=1e-3,
        warmup_steps=warmup_steps,
    )

    # Compile the model
    optimizer = keras.optimizers.Adam(learning_rate=cosine_decay)
    loss = keras.losses.BinaryCrossentropy()
    metrics = ["accuracy", sensitivity, specificity]

    print(f"[INFO] Compiling the {head_name} model...")
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=metrics
    )

    return model

def build_tertiary_classification_model(warmup_steps, decay_steps, head_name):
    # Define Input
    inputs = keras.Input(shape=config.IMAGE_SIZE + [3,], batch_size=config.BATCH_SIZE)

    # Define Backbone
    backbone = keras_cv.models.EfficientNetV2Backbone.from_preset("efficientnetv2_b3")
    backbone.include_rescaling = False
    x = backbone(inputs)

    # GAP to get the activation maps
    gap = keras.layers.GlobalAveragePooling2D()
    x = gap(x)

    # Define 'necks' for the tertiary classification head
    x_head = keras.layers.Dense(32, activation='silu')(x)

    # Define tertiary classification head
    output = keras.layers.Dense(3, name=head_name, activation='softmax')(x_head)

    # Create model
    print(f"[INFO] Building the {head_name} model...")
    model = keras.Model(inputs=inputs, outputs=output)

    # Cosine Decay
    cosine_decay = keras.optimizers.schedules.CosineDecay(
        initial_learning_rate=1e-4,
        decay_steps=decay_steps,
        alpha=0.0,
        warmup_target=1e-3,
        warmup_steps=warmup_steps,
    )

    # Compile the model
    optimizer = keras.optimizers.Adam(learning_rate=cosine_decay)
    loss = keras.losses.CategoricalCrossentropy()
    metrics = ["accuracy"]

    print(f"[INFO] Compiling the {head_name} model...")
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=metrics
    )

    return model

Train

In [None]:
# get image_paths and labels
print("[INFO] Building the dataset...")
train_paths = bowel_train_data.image_path.values; train_labels = bowel_train_data[config.TARGET_COLS].values.astype(np.float32)
valid_paths = bowel_val_data.image_path.values; valid_labels = bowel_val_data[config.TARGET_COLS].values.astype(np.float32)

# train and valid dataset
train_ds = build_dataset(image_paths=train_paths, labels=train_labels)
val_ds = build_dataset(image_paths=valid_paths, labels=valid_labels)

total_train_steps = train_ds.cardinality().numpy() * config.BATCH_SIZE * config.EPOCHS
warmup_steps = int(total_train_steps * 0.10)
decay_steps = total_train_steps - warmup_steps

print(f"{total_train_steps=}")
print(f"{warmup_steps=}")
print(f"{decay_steps=}")

In [None]:
# Directory where you want to save the models
save_dir = BASE_PATH + "/checkpoint/"

# List of model names
model_names = ["bowel"]

# Create a 1x2 grid for the subplots
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Flatten axes to iterate through them
axes = axes.flatten()

for i, name in enumerate(model_names):
    # Build the model
    if name in ["bowel", "extra"]:
        model = build_binary_classification_model(warmup_steps, decay_steps, name)
    else:
        model = build_tertiary_classification_model(warmup_steps, decay_steps, name)

    # Train the model
    history = model.fit(train_ds, epochs=config.EPOCHS, validation_data=val_ds)

    model_filename = f"EfficinetnetB3_{name}.keras"
    model_path = os.path.join(save_dir, model_filename)
    model.save(model_path)

    # Plot training accuracy
    axes[0].plot(history.history['accuracy'], label='Training ' + name)
    # Plot validation accuracy
    axes[1].plot(history.history['val_accuracy'], label='Validation ' + name)

    axes[0].set_title("Training Accuracy")
    axes[1].set_title("Validation Accuracy")
    axes[0].set_xlabel('Epoch')
    axes[1].set_xlabel('Epoch')
    axes[0].set_ylabel('Accuracy')
    axes[1].set_ylabel('Accuracy')
    axes[0].legend()
    axes[1].legend()

    plt.tight_layout()
    plt.show()

[INFO] Building the bowel model...
[INFO] Compiling the bowel model...
Epoch 1/15
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1897s[0m 127s/step - accuracy: 0.4070 - loss: 0.7203 - mean_metric_wrapper: 0.4154 - mean_metric_wrapper_1: 0.4005 - val_accuracy: 0.5000 - val_loss: 0.7229 - val_mean_metric_wrapper: 0.0000e+00 - val_mean_metric_wrapper_1: 1.0000
Epoch 2/15
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m767s[0m 69s/step - accuracy: 0.5135 - loss: 0.7205 - mean_metric_wrapper: 0.5106 - mean_metric_wrapper_1: 0.5707 - val_accuracy: 0.5000 - val_loss: 0.6931 - val_mean_metric_wrapper: 1.0000 - val_mean_metric_wrapper_1: 0.0000e+00
Epoch 3/15
[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m763s[0m 66s/step - accuracy: 0.5926 - loss: 0.6990 - mean_metric_wrapper: 0.6845 - mean_metric_wrapper_1: 0.5349 - val_accuracy: 0.5000 - val_loss: 0.6929 - val_mean_metric_wrapper: 0.0000e+00 - val_mean_metric_wrapper_1: 1.0000
Epoch 4/15
[1m11/11[0m [32m━━━