<a href="https://colab.research.google.com/github/jinsobak/CMM_DeepLearning_Module/blob/SHY-code/autotuned.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import optuna
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

def prepare_data(csv_file):
    # CSV 파일을 읽어들여 DataFrame으로 변환
    all_data = pd.read_csv(csv_file, encoding='cp949')

    # 특징 선택 (불필요한 열 제거 등)
    selected_features = all_data.drop(columns=['품질상태'])  # 품질상태를 제외한 특징 선택

    # 숫자 데이터만 선택
    numeric_features = selected_features.select_dtypes(include=[np.number])

    # 딥러닝의 입력 데이터와 정답 데이터 생성
    X = numeric_features.values  # 입력 데이터
    y = all_data['품질상태'].values  # 출력 데이터

    # 테스트 데이터와 트레이닝 데이터로 분할
    X_train, X_test_full, Y_train, Y_test_full = train_test_split(X, y, test_size=0.2, random_state=42)
    X_test, X_val, Y_test, Y_val = train_test_split(X_test_full, Y_test_full, test_size=0.5, random_state=42)

    # 데이터 스케일링
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    X_val_scaled = scaler.transform(X_val)

    return X_train_scaled, X_test_scaled, X_val_scaled, Y_train, Y_test, Y_val, scaler, selected_features.columns


# Objective 함수: Optuna가 하이퍼파라미터 튜닝에 사용
def objective(trial):
    # CSV 파일 경로
    csv_file = '/content/data_jd_hd_delete_material_no_NTC_pca_component_7.csv'

    # 데이터 전처리
    X_train, X_test, X_val, y_train, y_test, Y_val, scaler, feature_columns = prepare_data(csv_file)

    # 하이퍼파라미터 범위 정의
    num_layers = trial.suggest_int('num_layers', 2, 7)  # 은닉층의 개수를 튜닝
    hidden_size = trial.suggest_int('hidden_size', 4, 256)  # 각 은닉층의 뉴런 수
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)  # 학습률
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])  # 배치 크기
    activation = trial.suggest_categorical('activation', ['relu', 'tanh', 'sigmoid'])  # 활성화 함수 선택

    # 특징과 레이블을 TensorFlow Dataset으로 변환
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train)).batch(batch_size)
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(batch_size)
    val_dataset = tf.data.Dataset.from_tensor_slices((X_val, Y_val)).batch(batch_size)

    # 모델 구성
    model = models.Sequential()
    model.add(layers.Dense(hidden_size, activation=activation, input_shape=(X_train.shape[1],)))

    for _ in range(num_layers):
        model.add(layers.Dense(hidden_size, activation=activation))

    model.add(layers.Dense(1, activation='sigmoid'))  # 이진 분류를 위한 출력층

    # 모델 컴파일
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    # Early Stopping 및 Learning Rate Scheduler 설정
    early_stopping = EarlyStopping(min_delta=0.001, patience=10, restore_best_weights=True)
    lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)

    # 모델 학습
    history = model.fit(train_dataset,
                        epochs=50,
                        callbacks=[early_stopping, lr_scheduler],
                        validation_data=val_dataset,
                        verbose=0)

    # 모델 평가
    loss, accuracy = model.evaluate(test_dataset, verbose=0)

    # 예측 및 성능 평가
    y_pred_prob = model.predict(X_test)
    y_pred = (y_pred_prob > 0.5).astype(int)

    # F1 스코어를 최대화하는 방향으로 튜닝
    f1 = f1_score(y_test, y_pred)

    return f1

# Optuna 스터디 생성 및 최적화 수행
if __name__ == "__main__":
    study = optuna.create_study(direction='maximize')  # F1 스코어를 최대화
    study.optimize(objective, n_trials=50)

    # 최적의 하이퍼파라미터 출력
    print("Best hyperparameters: ", study.best_params)

    # 최적의 하이퍼파라미터로 모델 학습 및 평가
    best_params = study.best_params

    # CSV 파일 경로
    csv_file = '/content/data_jd_hd_delete_material_no_NTC_pca_component_7.csv'

    # 데이터 전처리
    X_train, X_test, X_val, y_train, y_test, Y_val, scaler, feature_columns = prepare_data(csv_file)

    # 최적의 하이퍼파라미터로 모델 구성
    model = models.Sequential()
    model.add(layers.Dense(best_params['hidden_size'], activation=best_params['activation'], input_shape=(X_train.shape[1],)))

    for _ in range(best_params['num_layers']):
        model.add(layers.Dense(best_params['hidden_size'], activation=best_params['activation']))

    model.add(layers.Dense(1, activation='sigmoid'))  # 이진 분류를 위한 출력층

    # 최적의 학습률로 모델 컴파일
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=best_params['learning_rate']),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    # 특징과 레이블을 TensorFlow Dataset으로 변환
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train)).batch(best_params['batch_size'])
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(best_params['batch_size'])
    val_dataset = tf.data.Dataset.from_tensor_slices((X_val, Y_val)).batch(best_params['batch_size'])

    # Early Stopping 및 Learning Rate Scheduler 설정
    early_stopping = EarlyStopping(min_delta=0.001, patience=10, restore_best_weights=True)
    lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)

    # 모델 학습
    history = model.fit(train_dataset,
                        epochs=50,
                        callbacks=[early_stopping, lr_scheduler],
                        validation_data=val_dataset)

    # 모델 평가
    loss, accuracy = model.evaluate(test_dataset)
    print(f'Test Loss: {loss}, Test Accuracy: {accuracy}')

    # 예측 결과
    y_pred_prob = model.predict(X_test)
    y_pred = (y_pred_prob > 0.5).astype(int)

    # Confusion Matrix
    cm = confusion_matrix(y_test, y_pred)
    print("Confusion Matrix:")
    print(cm)

    # 성능 지표
    acc = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    print(f'Accuracy: {acc}')
    print(f'Precision: {precision}')
    print(f'Recall: {recall}')
    print(f'F1 Score: {f1}')
