In [None]:
from google.colab import drive
drive.mount('/gdrive', force_remount=True)

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import RandomOverSampler
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from IPython.display import display, Javascript

In [None]:
# 런타임 오류 방지 함수
def keep_alive():
    display(Javascript('''
        function ClickConnect(){
            console.log("클릭 연결 버튼");
            document.querySelector("colab-connect-button").click()
        }
        setInterval(ClickConnect, 60000)
    '''))

In [None]:
# 데이터 로드 및 전처리
def load_and_preprocess_data():
    df = pd.read_csv('/gdrive/MyDrive/Final project/1_Red/3_데이터수집_저장/0_데이터수집폴더/피부 원본 데이터/final.csv')
    for col in ['annotations', 'equipment','bbox']:
        df[col] = df[col].apply(lambda x: eval(x) if isinstance(x, str) else x)
    return df

# 이미지 로드 및 전처리 함수
def load_and_preprocess_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, [224, 224])
    img = tf.keras.applications.efficientnet.preprocess_input(img)
    return img

In [None]:
# 데이터 생성기
def create_data_generator(X, y, batch_size=32, is_training=True):
    def gen():
        for i in range(len(X)):
            img_path = X.iloc[i]
            if os.path.exists(img_path):  # 파일 존재 여부 확인
                img = load_and_preprocess_image(img_path)
                label = y.iloc[i]
                yield img, label
            else:
                print(f"Skipping missing file: {img_path}")  # 누락된 파일 정보 출력

    dataset = tf.data.Dataset.from_generator(
        gen,
        output_signature=(
            tf.TensorSpec(shape=(224, 224, 3), dtype=tf.float32),
            tf.TensorSpec(shape=(), dtype=tf.float32)
        )
    )

    if is_training:
        dataset = dataset.shuffle(buffer_size=len(X))

    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
# 검증 데이터 생성기
def create_val_data_generator(X, y, directory, batch_size=32):
    return create_data_generator(X, y, directory, batch_size, is_training=False)

In [None]:
# 모델 생성 함수
def create_model(output_dim, model_type):
    base_model = tf.keras.applications.EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    x_gap = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
    x_gmp = tf.keras.layers.GlobalMaxPooling2D()(base_model.output)
    x = tf.keras.layers.Concatenate()([x_gap, x_gmp])
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4)(x)

    if model_type == 'classification':
        output = tf.keras.layers.Dense(output_dim, activation='softmax')(x)
    else:  # regression
        output = tf.keras.layers.Dense(1)(x)

    return tf.keras.Model(inputs=base_model.input, outputs=output)

In [None]:
# 성능 시각화 함수
def plot_performance(history, metric_name, facepart, feature, model_type):
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(20, 5))

    # Loss plot
    ax1.plot(history.history['loss'], label='Train Loss')
    ax1.plot(history.history['val_loss'], label='Validation Loss')
    ax1.set_title(f'{feature} Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()

    # Metric plot
    ax2.plot(history.history[metric_name], label=f'Train {metric_name.upper()}')
    ax2.plot(history.history[f'val_{metric_name}'], label=f'Validation {metric_name.upper()}')
    ax2.set_title(f'{feature} {metric_name.upper()}')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel(metric_name.upper())
    ax2.legend()

    # ROC AUC plot
    ax3.plot(history.history['auc'], label='Train AUC')
    ax3.plot(history.history['val_auc'], label='Validation AUC')
    ax3.set_title(f'{feature} ROC AUC')
    ax3.set_xlabel('Epoch')
    ax3.set_ylabel('AUC')
    ax3.legend()

    plt.tight_layout()
    plt.savefig(f'/gdrive/MyDrive/Final project/1_Red/5_분석모델링/피부진단/model/facepart_{facepart}_{feature}_{model_type}_performance.png')
    plt.close()

In [None]:
# 모델 훈련 함수
def train_model(model, train_data, val_data, facepart, feature, model_type, epochs=50, batch_size=32):
    initial_lr = 1e-4
    optimizer = tf.keras.optimizers.Adam(learning_rate=initial_lr)

    if model_type == 'regression':
        loss = 'mean_squared_error'
        metrics = ['mae']
    else:  # classification
        loss = 'sparse_categorical_crossentropy'
        metrics = ['accuracy']

    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=f'/gdrive/MyDrive/Final project/1_Red/5_분석모델링/피부진단/model/facepart_{facepart}_{feature}_{model_type}_checkpoint_{{epoch:02d}}.keras',
        save_best_only=True,
        save_weights_only=False,
        monitor='val_loss',
        mode='min',
        save_freq=10)

    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)

    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

    history = model.fit(
        train_data,
        validation_data=val_data,
        epochs=epochs,
        verbose=1,
        callbacks=[checkpoint_callback, reduce_lr, early_stopping]
    )

    model.save(f'/gdrive/MyDrive/Final project/1_Red/5_분석모델링/피부진단/model/facepart_{facepart}_{feature}_{model_type}_final_model.keras')
    return history

In [None]:
# 이미지 경로 가져오기 함수
def get_image_path(row, feature):
    facepart = row['facepart']
    filename = row['filename'].split('.')[0]
    class_value = row['annotations'][feature]
    source_type = row['source_type']

    if facepart == 0:
        base_path = '/gdrive/MyDrive/Final project/1_Red/3_데이터수집_저장/0_데이터수집폴더/피부 데이터/Training/01.원천데이터'
    else:
        facepart_names = ['','forehead','glabellus','l_perocular','r_perocular','l_cheek','r_cheek','lip','chin']
        base_path = f'/gdrive/MyDrive/Final project/1_Red/4_데이터탐색_전처리/facepart별 피부 이미지/classified_cropped/{facepart_names[facepart]}/{feature}/{source_type}/{class_value}'

    return str(os.path.join(base_path, f"{filename}_{facepart}.jpg"))

# bbox 유효성 검증 함수
def valid_bbox(bbox):
    if bbox is None:
        return False
    if isinstance(bbox, list) and len(bbox) == 4:
        if bbox == ['None', 'None', 'None', 'None']:
            return False
        return all(isinstance(b, int) and b >= 0 for b in bbox)
    return False

In [None]:
# facepart별 모델 훈련 함수
def train_facepart_models(facepart, train_classification=True, train_regression=True):
    print(f"Processing facepart {facepart}")

    facepart_df = df[df['facepart'] == facepart]
    facepart_df = facepart_df[facepart_df['bbox'].apply(valid_bbox)]

    if train_classification:
        for feature in facepart_df['annotations'].iloc[0].keys():
            y = facepart_df['annotations'].apply(lambda x: x.get(feature, None))

            if y.nunique() > 1:
                print(f"Starting classification training for facepart {facepart}, feature {feature}")

                # X = facepart_df.apply(lambda row: get_image_path(row, feature), axis=1)

                train_data = facepart_df[facepart_df['source_type'] == 'train']
                val_data = facepart_df[facepart_df['source_type'] == 'val']

                X_train = train_data.apply(lambda row: get_image_path(row, feature), axis=1)
                y_train = train_data['annotations'].apply(lambda x: x.get(feature, None))
                X_val = val_data.apply(lambda row: get_image_path(row, feature), axis=1)
                y_val = val_data['annotations'].apply(lambda x: x.get(feature, None))

                train_generator = create_data_generator(X_train, y_train)
                val_generator = create_data_generator(X_val, y_val)

                model = create_model(y.nunique(), 'classification')
                history = train_model(model, train_generator, val_generator, facepart, feature, 'classification')
                plot_performance(history, ['accuracy'], facepart, feature, 'classification')

    if train_regression:
        regression_features = ['forehead_moisture', 'r_cheek_moisture', 'l_cheek_moisture', 'chin_moisture',
                               'chin_elasticity_R2', 'r_cheek_elasticity_R2', 'l_cheek_elasticity_R2',
                               'forehead_elasticity_R2', 'pigmentation_count', 'r_cheek_pore', 'l_cheek_pore']

        for feature in regression_features:
            if feature in facepart_df['equipment'].iloc[0]:
                y = facepart_df['equipment'].apply(lambda x: x.get(feature, None))

                if not y.isnull().all():
                    print(f"Starting regression training for facepart {facepart}, feature {feature}")

                    # X = facepart_df.apply(lambda row: get_image_path(row, feature), axis=1)

                    train_data = facepart_df[facepart_df['source_type'] == 'train']
                    val_data = facepart_df[facepart_df['source_type'] == 'val']

                    X_train = train_data.apply(lambda row: get_image_path(row, feature), axis=1)
                    y_train = train_data['equipment'].apply(lambda x: x.get(feature, None))
                    X_val = val_data.apply(lambda row: get_image_path(row, feature), axis=1)
                    y_val = val_data['equipment'].apply(lambda x: x.get(feature, None))

                    train_generator = create_data_generator(X_train, y_train)
                    val_generator = create_data_generator(X_val, y_val)

                    model = create_model(1, 'regression')
                    history = train_model(model, train_generator, val_generator, facepart, feature, 'regression')
                    plot_performance(history, ['mae'], facepart, feature, 'regression')

In [None]:
if __name__ == "__main__":
    keep_alive()
    df = load_and_preprocess_data()
    user_input = input("처리할 facepart 범위를 선택하세요 (1: 1-2, 2: 3-6, 3: 7-8): ")
    if user_input == '1':
        facepart_range = [1, 2]
    elif user_input == '2':
        facepart_range = [3, 4, 5, 6]
    elif user_input == '3':
        facepart_range = [7, 8]
    else:
        print("잘못된 입력입니다.")
        exit()
    train_class = input("분류 모델을 학습하시겠습니까? (y/n): ").lower() == 'y'
    train_reg = input("회귀 모델을 학습하시겠습니까? (y/n): ").lower() == 'y'
    for facepart in facepart_range:
        train_facepart_models(facepart, train_classification=train_class, train_regression=train_reg)