In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import load_img, img_to_array

def load_data(data_dir, img_size=(128, 128)):
    images = []
    masks = []

    for img_file in os.listdir(data_dir):
        if '_mask' not in img_file:
            img_path = os.path.join(data_dir, img_file)
            mask_path = os.path.join(data_dir, img_file.replace('.jpg', '_mask.jpg'))
    
            img = load_img(img_path, target_size=img_size)
            mask = load_img(mask_path, target_size=img_size, color_mode='grayscale')
    
            img = img_to_array(img) / 255.0
            mask = img_to_array(mask) / 255.0
    
            images.append(img)
            masks.append(mask)

    return np.array(images), np.array(masks)

train_dir = '/kaggle/input/unet12342/split_datasets/train'
val_dir = '/kaggle/input/unet12342/split_datasets/val'
test_dir = '/kaggle/input/unet12342/split_datasets/test'

X_train, y_train = load_data(train_dir)
X_val, y_val = load_data(val_dir)
X_test, y_test = load_data(test_dir)

# EfficientNet의 기본 블록 구현
def conv_block(x, filters, kernel_size=3, strides=1, padding='same'):
    x = layers.Conv2D(filters, kernel_size, strides=strides, padding=padding)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    return x

def mb_conv_block(x, filters, expansion_factor, strides):
    # 확장 레이어
    in_channels = x.shape[-1]
    x = conv_block(x, in_channels * expansion_factor, kernel_size=1)
    
    # Depthwise Convolution
    x = layers.DepthwiseConv2D(kernel_size=3, strides=strides, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    
    # Pointwise Convolution
    x = conv_block(x, filters, kernel_size=1)
    return x

def build_efficientnet_unet(input_shape=(128, 128, 3)):
    inputs = layers.Input(shape=input_shape)

    # Encoder
    x = conv_block(inputs, 32)
    x = mb_conv_block(x, filters=16, expansion_factor=1, strides=1)  # Block 1
    skip1 = x

    x = mb_conv_block(x, filters=24, expansion_factor=6, strides=2)  # Block 2
    skip2 = x

    x = mb_conv_block(x, filters=40, expansion_factor=6, strides=2)  # Block 3
    skip3 = x

    x = mb_conv_block(x, filters=80, expansion_factor=6, strides=2)  # Block 4
    skip4 = x

    x = mb_conv_block(x, filters=112, expansion_factor=6, strides=1)  # Block 5
    x = mb_conv_block(x, filters=192, expansion_factor=6, strides=2)  # Block 6

    # Bottleneck
    x = mb_conv_block(x, filters=320, expansion_factor=6, strides=1)  # Block 7

    # Decoder
    x = layers.Conv2DTranspose(192, (3, 3), strides=(2, 2), padding='same')(x)
    x = layers.concatenate([x, skip4])
    x = conv_block(x, 192)

    x = layers.Conv2DTranspose(112, (3, 3), strides=(2, 2), padding='same')(x)
    x = layers.concatenate([x, skip3])
    x = conv_block(x, 112)

    x = layers.Conv2DTranspose(80, (3, 3), strides=(2, 2), padding='same')(x)
    x = layers.concatenate([x, skip2])
    x = conv_block(x, 80)

    x = layers.Conv2DTranspose(32, (3, 3), strides=(2, 2), padding='same')(x)
    x = layers.concatenate([x, skip1])
    x = conv_block(x, 32)

    outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(x)

    model = models.Model(inputs=inputs, outputs=outputs)
    return model

model = build_efficientnet_unet()
model.summary()

def dice_coef(y_true, y_pred, smooth=1e-6):
    y_pred = tf.cast(y_pred > 0.5, dtype=tf.float32)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2.0 * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1 - dice_coef(y_true, y_pred)

def combined_loss(y_true, y_pred, a=0.95):
    logit_loss_value = tf.keras.losses.binary_crossentropy(y_true, y_pred, from_logits=False)
    dice_loss_value = dice_loss(y_true, y_pred)
    return a * dice_loss_value + (1 - a) * logit_loss_value

model.compile(optimizer='adam', loss=combined_loss, metrics=[dice_coef])

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_dice_coef',
    factor=0.9,
    patience=3,
    min_lr=1e-6,
    mode='max',
    verbose=1
)

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    'best_efficientnet_unet_model.keras',
    monitor='val_dice_coef',
    save_best_only=True,
    mode='max',
    verbose=1
)

history = model.fit(X_train, y_train,
                    batch_size=16,
                    epochs=50,
                    validation_data=(X_val, y_val),
                    callbacks=[reduce_lr, model_checkpoint]
                   )

model.save('efficientnet_unet_model.keras')


In [None]:
best_model = tf.keras.models.load_model('best_unet_model.keras', 
                                        custom_objects={'combined_loss': combined_loss,
                                                        'dice_coef': dice_coef})

test_loss, test_dice = best_model.evaluate(X_test, y_test)

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.subplot(1, 2, 2)
plt.plot(history.history['dice_coef'])
plt.plot(history.history['val_dice_coef'])
plt.title('Model dice_coef')
plt.ylabel('dice_coef')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='lower right')
plt.tight_layout()
plt.show()

def visualize_results(model, X, y, num_samples=3):
    predictions = model.predict(X[:num_samples])
    predictions = (predictions > 0.5).astype(np.float32)
    plt.figure(figsize=(4*num_samples, 12))
    for i in range(num_samples):
        plt.subplot(3, num_samples, i + 1)
        plt.imshow(X[i])
        plt.title('Original Image')
        plt.axis('off')
        plt.subplot(3, num_samples, i + 1 + num_samples)
        plt.imshow(y[i, :, :, 0], cmap='gray')
        plt.title(f'True Mask\nDice: {dice_coef(y[i:i+1], predictions[i:i+1]).numpy():.4f}')
        plt.axis('off')
        plt.subplot(3, num_samples, i + 1 + 2*num_samples)
        plt.imshow(predictions[i, :, :, 0], cmap='gray')
        plt.title('Predicted Mask')
        plt.axis('off')
    plt.tight_layout()
    plt.show()

visualize_results(model, X_test, y_test)

def measure_model_size_and_time(model, sample_input_shape):
    # 모델 사이즈 측정
    model_size = model.count_params()
    model_size_bytes = model_size * 4  # 각 파라미터가 4바이트(32비트)라고 가정

    # 계산 시간 측정
    sample_input = np.random.rand(*sample_input_shape).astype(np.float32)

    start_time = time.time()
    model.predict(sample_input)
    end_time = time.time()

    inference_time = end_time - start_time

    print(f"모델의 파라미터 수: {model_size} (약 {model_size_bytes / (1024 ** 2):.2f} MB)")
    print(f"예측 시간: {inference_time:.4f} 초")

# 사용 예시
measure_model_size_and_time(model, (1, 128, 128, 3))