#### 1. 기본 설정

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import font_manager, rc
import platform
import seaborn as sns
from tensorflow.keras import backend as K
import math
import warnings
import scipy.stats as stats  # for Q-Q plots
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.ticker as ticker
from matplotlib.ticker import MultipleLocator
import shap

# 경고 무시
warnings.filterwarnings("ignore")

# 데이터 전처리 알고리즘(비지도 학습)
# 문자데이터를 숫자로 변환해주는 알고리즘
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
# 각 데이터의 범위를 비슷한 수준으로 조정하는 알고리즘(표준화)
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV

# 교차검증
# 평가 지표를 하나만 사용하는 경우
from sklearn.model_selection import cross_val_score
# 평가 지표를 다수를 사용하는 경우
from sklearn.model_selection import cross_validate
# 교차 검증 방식
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold

# 객체 저장
import pickle

# 회귀용 알고리즘
from sklearn.neighbors import KNeighborsRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, AdaBoostRegressor, GradientBoostingRegressor, ExtraTreesRegressor
from lightgbm import LGBMRegressor
from xgboost import XGBRegressor

# 회귀용 평가 함수
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_percentage_error
# mse : 실제값에서 이론값을 빼고 제곱한 값을 평균한 값 (작을수록 좋음)

# 분류용 머신러닝 알고리즘
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier, VotingClassifier
from lightgbm import LGBMClassifier
from xgboost import XGBClassifier

# 분류용 평가 함수
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

# 차원축소
from sklearn.decomposition import PCA

# 정확도를 확인하는 함수
from sklearn.metrics import accuracy_score
# MSE를 확인하는 함수
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from tensorflow.keras.callbacks import Callback

# 딥러닝 관련
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LeakyReLU, Dense, Activation, BatchNormalization, Dropout
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.optimizers import Adam, RMSprop, SGD, Adagrad, Adamax
from tensorflow.keras import regularizers
from tensorflow.keras.initializers import HeNormal

# 원핫인코딩
from tensorflow.keras.utils import to_categorical

from scikeras.wrappers import KerasRegressor
from scipy.optimize import minimize

In [2]:
# 데이터프레임의 출력 행과 열을 모두 보이도록 설정
pd.set_option('display.max_rows', None)  # 출력할 최대 행 개수를 None으로 설정
pd.set_option('display.max_columns', None)  # 출력할 최대 열 개수를 None으로 설정
pd.set_option('display.expand_frame_repr', False)  # 열이 화면을 넘어서지 않도록 설정

In [3]:
# 그래프 설정

# 한글 폰트 설정
if platform.system() == 'Windows':
    font_name = font_manager.FontProperties(fname='c:/Windows/Fonts/malgun.ttf').get_name()
elif platform.system() == 'Darwin':  # Mac OS
    font_name = 'AppleGothic'
else:
    font_name = 'NanumGothic'

rc('font', family=font_name)

# 폰트 설정 시 마이너스 기호가 깨짐 -> 방지하는 코드
plt.rcParams['axes.unicode_minus'] = False

In [4]:
# palette 색상 설정 (기본 - 'tab10')
palette = ['#4472C4', '#44A3C0', '#43BDA7', '#44B874', '#46B246', '#70AD47']   # 그라데이션
palette2 = ['#7944E3', '#E9612C', '#4B87F7', '#EA346F', '#24C69A', '#F6C044']  # 대비

In [5]:
pd.set_option('display.max_columns', None) ## 모든 열을 출력
#pd.set_option('display.max_rows', None) 

#### 2. 데이터 불러오기 (25℃)

In [None]:
data = pd.read_csv('../Taurus_240820_2.csv')
data

In [None]:
data.info()

In [None]:
data.isna().sum()

In [None]:
data.describe()

In [None]:
# 변수 그룹화
group_RM = ['raw_material', 'RM_PSA_D50']
group_PS = ['PS_Temp', 'PS_Ratio']
group_etching = ['p-Si_pore_volume', 'p-Si_pore_size', 'p-Si_domain_size', 'p-Si_Oxygen']
group_coating = ['C_condition', 'Carbon', 'c-Oxygen', 'c-Si_domain_size', 'c-Surface_area']
group_cell = ['Li_capa_25', 'Deli_capa_1V_25', 'FCE_1V_25', 'Li_capa_45', 'Deli_capa_1V_45', 'FCE_1V_45']

# 그룹 설정
groups = [group_RM, group_PS, group_etching, group_coating, group_cell]
row_titles = ['원료', '상분리', '에칭', '코팅', '셀 결과']

# x축 이름 설정 딕셔너리
xlabel_dict = {
    'raw_material': '원료',
    'RM_PSA_D50': '원료 PSA D50',
    'PS_Temp': '상분리 온도',
    'PS_Ratio': '상분리 Ratio',
    'p-Si_pore_volume': 'p-Si Pore Volume',
    'p-Si_pore_size': 'p-Si Pore Size',
    'p-Si_domain_size': 'p-Si Domain Size',
    'p-Si_Oxygen': 'p-Si Oxygen',
    'C_condition': '코팅조건',
    'Carbon': 'Carbon',
    'c-Oxygen': 'c-Oxygen',
    'c-Si_domain_size': 'c-Si Domain Size',
    'c-Surface_area': 'c-Surface Area',
    'FCETemp' : 'Temperature',
    'LiCap' : 'Lithiation Capacity',
    'DeliCap': 'Delithiation Capacity',
    'FCE': 'FCE'
}

In [None]:
df = data.drop(['Sample ID', 'raw_material'], axis = 1)

mean_values = df.mean().round(2)
std_values = df.std().round(2)
min_values = df.min().round(2)
max_values = df.max().round(2)

summary = pd.DataFrame({
    '평균': mean_values,
    '표준편차': std_values,
    '최소값': min_values,
    '최대값': max_values
})

# xlabel_dict을 사용하여 인덱스 이름 변경
summary.index = summary.index.map(xlabel_dict)

display(summary)

In [None]:
column_list = data.columns.tolist()
print(column_list)

In [None]:
pre_data = data.drop(['Sample ID'], axis = 1)
pre_data

In [None]:
pre_corr = pre_data.drop(columns = ['raw_material']).corr().round(3)
pre_corr

In [None]:
mask = np.zeros_like(pre_corr, dtype=bool)
mask[np.triu_indices_from(mask)] = True

plt.figure(figsize = (15, 15))
sns.heatmap(pre_corr, linewidths = .3, annot = True, 
            fmt=".2f", cmap = 'coolwarm', mask = mask,
            vmin = -1, vmax = 1, square = True, annot_kws={"size": 10})

In [None]:
encoder = OneHotEncoder(sparse_output = False)

# 범주형 변수만 선택하여 인코딩 수행
# 여기서 2차원 형태로 데이터를 전달하기 위해 데이터프레임으로 선택
encoded_data = encoder.fit_transform(pre_data[['raw_material']])
# 결과를 pandas 데이터프레임으로 변환
encoded_df = pd.DataFrame(encoded_data, columns = encoder.get_feature_names_out(['raw_material']))

# 원래 데이터프레임과 결합
data_final = pd.concat([pre_data.drop(columns=['raw_material']), encoded_df], axis=1)
data_final

#### 3. 데이터 분할

In [None]:
# 입력과 결과로 나눔
X = data_final.drop(['LiCap', 'DeliCap', 'FCE'], axis = 1)
y = data_final[['DeliCap', 'FCE']]

display(X)
display(y)

In [None]:
print(X.shape)
print(y.shape)

In [None]:
feature_names = X.columns.values
feature_names

In [None]:
# 표준화
scaler1 = StandardScaler()
X = scaler1.fit_transform(X)
X

In [None]:
y = y.to_numpy()
y

#### 4. 머신러닝

##### 4.1 학습

In [None]:
# 1. 데이터셋을 Train+Validation과 Test로 분할
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=1)
X_train_val.shape, X_test.shape, y_train_val.shape, y_test.shape

In [None]:
# 2. 하이퍼파라미터 튜닝 범위 설정
param_grids = {
    'DecisionTreeRegressor': {
        'max_depth': [None, 5, 10, 15, 20, 30],
        'min_samples_split': [2, 5, 10, 20],
        'min_samples_leaf': [1, 2, 4, 8]
    },
    'RandomForestRegressor': {
        'n_estimators': [50, 100, 200, 500],
        'max_depth': [None, 5, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'bootstrap': [True, False]
    },
    'ExtraTreesRegressor': {
        'n_estimators': [100, 200, 500],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'bootstrap': [True, False]
    },    
    'Ridge' : {
        'alpha' : [0.1, 1.0, 10.0, 100.0]
    },
    'Lasso' : {
        'alpha' : [0.1, 1.0, 10.0, 100.0]
    },
    'ElasticNet' : {
        'alpha': [0.1, 1.0, 10.0],
        'l1_ratio': [0.2, 0.5, 0.8]        
    },
    'XGBRegressor': {
        'n_estimators': [50, 100, 200, 500],
        'learning_rate': [0.01, 0.1, 0.05],
        'max_depth': [3, 5, 7],
        'subsample': [0.8, 1.0]
    }
}

In [None]:
# 3. 모델 목록
models = {
    'DecisionTreeRegressor': DecisionTreeRegressor(random_state=1),
    'RandomForestRegressor': RandomForestRegressor(random_state=1),
    'ExtraTreesRegressor' : ExtraTreesRegressor(random_state=1),
    'Ridge' : Ridge(),
    'Lasso' : Lasso(),
    'ElasticNet' : ElasticNet(),
    'XGBRegressor' : XGBRegressor(random_state=1)
}

In [None]:
# 4. KFold 설정
kf = KFold(n_splits=5, shuffle=True, random_state=1)

results = {name: {'MAE': [], 'MAPE': [], 'MSE': [], 'RMSE': [], 'R2': []} for name in models.keys()}
results_comparison = {name: {'Train': [], 'Validation': []} for name in models.keys()}

In [None]:
num_targets = y_train_val.shape[1]  # 다중 타겟의 개수
results = {
    name: {metric: [[] for _ in range(num_targets)] for metric in ['MAE', 'MAPE', 'MSE', 'RMSE', 'R2']}
    for name in models.keys()
}

for name, model in models.items():
    print(f"\n{name} 모델 하이퍼파라미터 튜닝 중...")

    grid_search = GridSearchCV(estimator=model, param_grid=param_grids[name], cv=kf, scoring='neg_mean_squared_error')
    grid_search.fit(X_train_val, y_train_val)
    
    print(f"최적의 하이퍼파라미터: {grid_search.best_params_}")
    print(f"최소 MSE: {-grid_search.best_score_:.3f}")

    for i, (train_idx, val_idx) in enumerate(kf.split(X_train_val)):
        X_train, X_val = X_train_val[train_idx], X_train_val[val_idx]
        y_train, y_val = y_train_val[train_idx], y_train_val[val_idx]

        model = grid_search.best_estimator_
        model.fit(X_train, y_train)

        # Training 예측 및 결과 저장
        y_train_pred = model.predict(X_train)
        results_comparison[name]['Train'].append((y_train, y_train_pred))
        
        # Validation 예측 및 결과 저장
        y_val_pred = model.predict(X_val)
        results_comparison[name]['Validation'].append((y_val, y_val_pred))

        # 다중 타겟 성능 기록
        for target_idx in range(y_val.shape[1]):
            mae = mean_absolute_error(y_val[:, target_idx], y_val_pred[:, target_idx])
            mape = mean_absolute_percentage_error(y_val[:, target_idx], y_val_pred[:, target_idx])
            mse = mean_squared_error(y_val[:, target_idx], y_val_pred[:, target_idx])
            rmse = mean_squared_error(y_val[:, target_idx], y_val_pred[:, target_idx], squared=False)
            r2 = r2_score(y_val[:, target_idx], y_val_pred[:, target_idx])

            results[name]['MAE'][target_idx].append(mae)
            results[name]['MAPE'][target_idx].append(mape)
            results[name]['MSE'][target_idx].append(mse)
            results[name]['RMSE'][target_idx].append(rmse)
            results[name]['R2'][target_idx].append(r2)

    # 폴드별로 타겟 결과 출력
    print(f"\n{name} Validation Results")
    for target_idx in range(y_train_val.shape[1]):
        print(f"\nTarget {target_idx}")
        for fold_idx, (mae, mape, mse, rmse, r2) in enumerate(zip(
                results[name]['MAE'][target_idx],
                results[name]['MAPE'][target_idx],
                results[name]['MSE'][target_idx],
                results[name]['RMSE'][target_idx],
                results[name]['R2'][target_idx])):
            print(f"Fold {fold_idx + 1}: MAE: {mae:.3f}, MAPE: {mape:.3f}, MSE: {mse:.3f}, "
                  f"RMSE: {rmse:.3f}, R2: {r2:.3f}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 다중 타겟 이름
target_names = ['Delithiation Capacity', 'FCE']

# K-Fold 및 Test 결과 그래프 그리기 함수
def plot_results_by_target(results, target_names):
    metrics = ['MAE', 'MAPE', 'MSE', 'RMSE', 'R2']
    
    for target_idx, target_name in enumerate(target_names):
        fig, axes = plt.subplots(1, len(metrics), figsize=(20, 4), sharey=False)
        fig.suptitle(f'K-Fold 성능 지표 - {target_name}', fontsize=16)
        
        for metric_idx, metric in enumerate(metrics):
            ax = axes[metric_idx]
            ax.set_title(metric)
            ax.set_xlabel('Fold Number')
            ax.set_ylabel(metric)

            all_scores_across_models = []  # 모든 모델의 점수를 수집
            for model_name, metrics_values in results.items():
                # K-Fold 점수 추출
                fold_scores = metrics_values[metric][target_idx]
                folds = range(1, len(fold_scores) + 1)

                # 그래프 그리기
                label = model_name if metric_idx == 0 else None
                ax.plot(folds, fold_scores, marker='o', label=label)
                
                all_scores_across_models.extend(fold_scores)

            # 축 범위 설정: 최소값과 최대값을 기준으로 10% 여유 추가
            min_val, max_val = min(all_scores_across_models), max(all_scores_across_models)
            if min_val == max_val:
                min_val, max_val = min_val - 1, max_val + 1  # 범위가 너무 좁은 경우
            margin = 0.1 * (max_val - min_val)
            ax.set_ylim(min_val - margin, max_val + margin)

            ax.grid()
        
        # 범례를 그래프 오른쪽 외부에 위치
        fig.legend(loc='center right', bbox_to_anchor=(1.15, 0.5))
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()

# 함수 호출
plot_results_by_target(results, target_names)

In [None]:
# 다중 타겟 성능 결과를 정리하는 데이터프레임 생성
df_results = pd.DataFrame()

# 알고리즘별로 데이터프레임에 결과 추가
for algorithm, metrics in results.items():
    for target_idx, target_name in enumerate(target_names):  # 다중 타겟 처리
        target_metrics = {metric: metrics[metric][target_idx] for metric in metrics.keys()}
        metric_df = pd.DataFrame(target_metrics)
        metric_df.columns = pd.MultiIndex.from_product([[algorithm], [target_name], metric_df.columns])
        df_results = pd.concat([df_results, metric_df], axis=1)

# 폴드 인덱스 추가
df_results.index = [f'Fold {i}' for i in range(1, len(df_results) + 1)]

# 평균 및 표준 편차 계산
means = df_results.mean(axis=0)
stds = df_results.std(axis=0)

# 평균 및 표준 편차를 데이터프레임에 추가
df_results.loc['Mean'] = means
df_results.loc['Standard deviation'] = stds

# 데이터 형식 포맷팅 함수
def format_value(value):
    if isinstance(value, str):  # 문자열은 그대로 반환
        return value
    if pd.isna(value):  # NaN은 그대로 반환
        return value
    if value < 0.1:
        return f"{value:.3f}"
    elif value < 1:
        return f"{value:.2f}"
    elif value < 100:
        return f"{value:.1f}"
    else:
        return f"{int(value)}"

target_1_results = df_results.loc[:, df_results.columns.get_level_values(1) == "Delithiation Capacity"]
target_1_results = target_1_results.applymap(format_value)
target_1_results_transposed = target_1_results.transpose()


target_2_results = df_results.loc[:, df_results.columns.get_level_values(1) == "FCE"]
target_2_results = target_2_results.applymap(format_value)
target_2_results_transposed = target_2_results.transpose()

display(target_1_results_transposed)
display(target_2_results_transposed)

In [None]:
# 실제값과 예측값을 비교하는 그래프를 그리는 함수
def plot_comparison_grid(results_comparison, target_names):
    dataset_types = ['Train', 'Validation']
    
    for model_name, datasets in results_comparison.items():
        fig, axes = plt.subplots(2, 2, figsize=(8, 8), sharex=False, sharey=False)
        fig.suptitle(f'Actual vs Predicted Values for {model_name}', fontsize=16)
        
        # 각 타겟에 대한 x축, y축의 공통 범위 계산
        target_limits = []
        for row, target_name in enumerate(target_names):
            min_vals, max_vals = [], []
            
            # 각 데이터셋에서 최소값과 최대값 찾기
            for dataset_type in dataset_types:
                for y_true, y_pred in datasets[dataset_type]:
                    min_vals.append(min(y_true[:, row].min(), y_pred[:, row].min()))
                    max_vals.append(max(y_true[:, row].max(), y_pred[:, row].max()))

            # 여유 범위 추가 (5%)
            overall_min = min(min_vals)
            overall_max = max(max_vals)
            range_margin = (overall_max - overall_min) * 0.05
            target_limits.append((overall_min - range_margin, overall_max + range_margin))
        
        # 그래프 그리기
        for row, target_name in enumerate(target_names):
            for col, dataset_type in enumerate(dataset_types):
                ax = axes[row, col]
                ax.set_title(f'{target_name} - {dataset_type}')
                
                # R² 점수 계산 및 그래프에 표시
                r2_scores = []
                color = palette[row]
                for fold_idx, (y_true, y_pred) in enumerate(datasets[dataset_type]):
                    r2 = r2_score(y_true[:, row], y_pred[:, row])
                    r2_scores.append(r2)
                    ax.scatter(y_true[:, row], y_pred[:, row], alpha=0.5, label=f'Fold {fold_idx+1}' if fold_idx == 0 else "")
                r2 = np.mean(r2_scores)  # 폴드별 R² 점수의 평균

                # R² 점수 텍스트로 표시
                ax.text(0.05, 0.9, f'$R^2$: {r2:.2f}', transform=ax.transAxes, fontsize=12, verticalalignment='top')
                min_limit, max_limit = target_limits[row]
                ax.set_xlim(min_limit, max_limit)
                ax.set_ylim(min_limit, max_limit)
                ax.plot([min_limit, max_limit], [min_limit, max_limit], 'b--', lw=2)
                ax.set_xlabel('True Values')
                ax.set_ylabel('Predicted Values')
        
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])  # 제목 공간을 위해 여백 조정
        plt.show()

plot_comparison_grid(results_comparison, target_names)

##### 4.2 성능 확인

In [None]:
# 성능 지표별 Boxplot 그리기 함수
def plot_boxplot_by_metrics_and_target(results, target_names):
    metrics = ['MAE', 'MAPE', 'MSE', 'RMSE', 'R2']

    for target_idx, target_name in enumerate(target_names):
        fig, axes = plt.subplots(1, len(metrics), figsize=(20, 5), sharey=False)
        fig.suptitle(f'성능 지표별 K-Fold Boxplot - {target_name}', fontsize=16)

        for metric_idx, metric in enumerate(metrics):
            ax = axes[metric_idx]
            ax.set_title(metric)
            ax.set_ylabel(metric)
            
            data = []
            labels = []
            for model_name, metrics_values in results.items():
                # 다중 타겟에 맞게 데이터 추출
                fold_scores = metrics_values[metric][target_idx]
                data.append(fold_scores)
                labels.append(model_name)

            # Boxplot 그리기
            box = ax.boxplot(data, labels=labels, patch_artist=True)
            for patch, color in zip(box['boxes'], plt.cm.tab10.colors[:len(labels)]):
                patch.set_facecolor(color)
                            
            # y축 범위 설정
            all_scores = [score for fold_scores in data for score in fold_scores]
            if all_scores:  # 데이터가 있을 경우에만 계산
                min_val, max_val = min(all_scores), max(all_scores)
                margin = 0.1 * (max_val - min_val)
                ax.set_ylim(min_val - margin, max_val + margin)

            ax.set_xticklabels(labels, rotation=45, ha='right')

        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        plt.show()

# 함수 호출
plot_boxplot_by_metrics_and_target(results, target_names)

In [None]:
# Ridge, ElasticNet에 대한 최적 모델 및 파라미터 가져오기
selected_models = ['Ridge', 'ElasticNet']
best_estimators = {}

# 각 모델에서 최적 하이퍼파라미터를 적용한 모델 준비
for model_name in selected_models:
    grid_search = GridSearchCV(
        estimator=models[model_name],
        param_grid=param_grids[model_name],
        cv=kf,
        scoring='neg_mean_squared_error'
    )
    grid_search.fit(X_train_val, y_train_val)
    best_estimators[model_name] = grid_search.best_estimator_
    print(f"{model_name} - 최적의 하이퍼파라미터: {grid_search.best_params_}")

test_results = {}

for model_name, model in best_estimators.items():
    print(f"\n{model_name} 모델 Test 성능 평가 중...")
    
    # Train+Validation 데이터로 재훈련
    model.fit(X_train_val, y_train_val)
    
    # Test 데이터 예측
    y_test_pred = model.predict(X_test)
    
    # 성능 지표 계산
    mae = mean_absolute_error(y_test, y_test_pred)
    mape = mean_absolute_percentage_error(y_test, y_test_pred)
    mse = mean_squared_error(y_test, y_test_pred)
    rmse = mean_squared_error(y_test, y_test_pred, squared=False)
    r2 = r2_score(y_test, y_test_pred)
    
    # 결과 저장
    test_results[model_name] = {
        'MAE': mae,
        'MAPE': mape,
        'MSE': mse,
        'RMSE': rmse,
        'R2': r2
    }
    
    print(f"Test 결과 - MAE: {mae:.3f}, MAPE: {mape:.3f}, MSE: {mse:.3f}, RMSE: {rmse:.3f}, R2: {r2:.3f}")

# Test 결과 출력
print("\nTest Results:")

df_test_results = pd.DataFrame(test_results).T
df_test_results = df_test_results[['MAE', 'MAPE', 'MSE', 'RMSE', 'R2']]
df_test_results = df_test_results.applymap(lambda x: f"{x:.3f}" if isinstance(x, (int, float)) else x)
df_test_results

In [None]:
def plot_actual_vs_predicted_by_target(models, X_train_val, y_train_val, X_test, y_test, target_names):
    for model_name, model in models.items():
        # Train+Validation 데이터에 대한 예측값
        y_train_val_pred = model.predict(X_train_val)

        # Test 데이터에 대한 예측값
        y_test_pred = model.predict(X_test)

        # 그래프 설정
        fig, axes = plt.subplots(2, 2, figsize=(8, 6))
        fig.suptitle(f"Actual vs Predicted - {model_name}", fontsize=16)

        datasets = [
            ('Train+Validation', y_train_val, y_train_val_pred, palette[0]),
            ('Test', y_test, y_test_pred, palette[3])
        ]

        for target_idx, target_name in enumerate(target_names):
            for col, (dataset_name, y_true, y_pred, color) in enumerate(datasets):
                ax = axes[target_idx, col]
                ax.scatter(
                    y_true[:, target_idx], 
                    y_pred[:, target_idx], 
                    alpha=0.5, 
                    color=color, 
                    label=f'{dataset_name}'
                )
                ax.set_title(f'{target_name} - {dataset_name} Data')
                ax.set_xlabel('True Values')
                ax.set_ylabel('Predicted Values')
                ax.legend()

                min_val = min(y_true[:, target_idx].min(), y_pred[:, target_idx].min())
                max_val = max(y_true[:, target_idx].max(), y_pred[:, target_idx].max())
                ax.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)

        plt.tight_layout(rect=[0, 0.03, 1, 0.95])  # 제목 간격 조정
        plt.show()

# Ridge, Lasso, ElasticNet 모델에 대해 타겟별 그래프 생성
plot_actual_vs_predicted_by_target(
    models=best_estimators,  # 최적 모델들
    X_train_val=X_train_val,
    y_train_val=y_train_val,
    X_test=X_test,
    y_test=y_test,
    target_names=['Lithiation Capacity', 'Delithiation Capacity']
)

In [None]:
sorted_models = sorted(results.items(), key=lambda x: np.mean(x[1]['R2']), reverse=True)

# 2. 각 모델의 feature importance 추출 및 subplot 시각화
for model_name in selected_models:
    fig, axes = plt.subplots(1, 2, figsize=(10, 5), sharey=False)
    fig.suptitle(f"Feature Importance - {model_name}", fontsize=16)
    
    for target_idx, target_name in enumerate(target_names):
        model = models[model_name]  # 선택한 모델 객체 가져오기
        model.fit(X_train_val, y_train_val[:, target_idx])  # 개별 타겟에 대해 학습 수행

        # Feature Importance가 존재하는 모델인지 확인 후 시각화
        if hasattr(model, 'feature_importances_'):
            feature_importances = model.feature_importances_
        elif hasattr(model, 'coef_'):
            feature_importances = np.abs(model.coef_)
        else:
            print(f"{model_name}는 Feature Importance를 제공하지 않음")
            continue

        # 각 subplot에 feature importance 시각화
        ax = axes[target_idx]
        ax.barh(range(len(feature_importances)), feature_importances, align='center')
        ax.set_title(f'{target_name}')
        ax.set_xlabel('Importance')
        ax.set_yticks(range(len(feature_importances)))
        ax.set_yticklabels(feature_names)
        ax.invert_yaxis()  # 중요도가 높은 순으로 정렬
    
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])  # 제목 여백 조정
    plt.show()

##### 4.3 결과 저장

In [None]:
save_model = models['Ridge']
best_ridge_model = GridSearchCV(estimator=save_model, param_grid=param_grids['Ridge'], cv=kf, scoring='neg_mean_squared_error').fit(X_train_val, y_train_val).best_estimator_

In [None]:
with open('best_ridge_model_241122.pkl', 'wb') as file:
    pickle.dump(best_ridge_model, file)
    pickle.dump(encoder, file)
    pickle.dump(scaler1, file)
print(".pkl 형식으로 성공적으로 저장되었습니다.")

##### 4.4 모델 복원

In [None]:
with open('best_ridge_model_241122.pkl', 'rb') as file:
    model = pickle.load(file)
    encoder = pickle.load(file)
    scaler = pickle.load(file)

##### 4.5 예측

In [None]:
new_data = pd.DataFrame({
    'raw_material': ['S6'],  # 범주형 변수
    'RM_PSA_D50': [5],           # 나머지 수치형 피처
    'PS_Temp': [845],
    'PS_Ratio' : [0.85],
    'p-Si_pore_volume' : [0.81],
    'p-Si_pore_size' : [3.11],
    'p-Si_domain_size' : [2],
    'p-Si_Oxygen' : [1],
    'C_condition' : [200],
    'Carbon' : [41.3],
    'c-Oxygen' : [1.4],
    'c-Si_domain_size' : [2.1],
    'c-Surface_area' : [19.8],
    'FCETemp' : [25]
})

encoded_new_data = encoder.transform(new_data[['raw_material']])
encoded_df = pd.DataFrame(encoded_new_data, columns=encoder.get_feature_names_out(['raw_material']))
processed_new_data = pd.concat([new_data.drop(columns=['raw_material']), encoded_df], axis=1)
scaled_new_data = scaler.transform(processed_new_data)
predicted_y = model.predict(scaled_new_data)

print("예측 결과:", predicted_y)

##### 4.6 최적의 조건 도출

In [23]:
# 저장된 모델 및 관련 객체 로드
with open('best_ridge_model_241122.pkl', 'rb') as file:
    model = pickle.load(file)  # Ridge 모델
    encoder = pickle.load(file)  # Encoder (OneHotEncoder)
    scaler = pickle.load(file)  # Scaler (StandardScaler)

# 특정 y 값을 출력하기 위한 X값 찾기
target_y = np.array([1800, 89])  # 목표 y 값 (사용자가 원하는 출력값)

# 입력 데이터 예시
initial_data = pd.DataFrame({
    'raw_material': ['S6'],  # 초기 범주형 값
    'RM_PSA_D50': [5],
    'PS_Temp': [845],
    'PS_Ratio': [0.85],
    'p-Si_pore_volume': [0.81],
    'p-Si_pore_size': [3.11],
    'p-Si_domain_size': [2],
    'p-Si_Oxygen': [1],
    'C_condition': [200],
    'Carbon': [41.3],
    'c-Oxygen': [1.4],
    'c-Si_domain_size': [2.1],
    'c-Surface_area': [19.8],
    'FCETemp': [25]
})

# 초기 추정값을 위한 전처리 과정
encoded_initial_data = encoder.transform(initial_data[['raw_material']])
encoded_df = pd.DataFrame(encoded_initial_data, columns=encoder.get_feature_names_out(['raw_material']))
processed_initial_data = pd.concat([initial_data.drop(columns=['raw_material']), encoded_df], axis=1)
scaled_initial_data = scaler.transform(processed_initial_data)
initial_guess = scaled_initial_data[0]  # 초기 추정값 설정

# FCETemp의 인덱스를 찾아서 해당 값을 고정
fce_temp_index = processed_initial_data.columns.get_loc('FCETemp')
fixed_value = scaled_initial_data[0, fce_temp_index]  # FCETemp의 고정된 스케일링 값

# FCETemp를 제외한 나머지 변수들만 추정값 설정
variable_indices = [i for i in range(initial_guess.shape[0]) if i != fce_temp_index]
initial_guess_variables = initial_guess[variable_indices]

# 최적화 목표 함수 정의
def objective_function(X):
    # X를 전체 변수로 확장하여 FCETemp 값 고정
    full_X = np.insert(X, fce_temp_index, fixed_value)
    full_X_reshaped = full_X.reshape(1, -1)
    try:
        y_pred = model.predict(full_X_reshaped)  # 예측값
    except ValueError as e:
        print("예측 중 오류 발생: ", e)
        return np.inf  # 오류 발생 시 큰 값을 반환하여 해당 경로를 피하도록 함
    # np.linalg.norm : 두 값 간의 유클리드 거리(오차)를 계산하여, 최적화 알고리즘이 이 값을 최소화하도록 함
    return np.linalg.norm(y_pred - target_y)  # 예측값과 목표 y의 차이를 최소화

# 변수의 경계 설정 (양수 범위로 설정, FCETemp를 제외한 나머지 변수들에 대한 경계 설정)
bounds = [(0, 2) for _ in range(len(variable_indices))]

# 최적화 실행 (최적화 방법을 'L-BFGS-B'로 변경하고 경계를 설정)
tol = 1e-5  # 허용 오차 설정
result = minimize(objective_function, initial_guess_variables, method='L-BFGS-B', bounds=bounds, options={'gtol': tol, 'disp': True, 'maxiter': 1000})

# 최적화된 결과 출력
if result.success:
    # 최적화된 값을 전체 변수로 확장하여 FCETemp 포함
    optimized_variables = result.x
    optimized_full = np.insert(optimized_variables, fce_temp_index, fixed_value)
    optimized_scaled = optimized_full.reshape(1, -1)
    optimized_unscaled = scaler.inverse_transform(optimized_scaled)
    optimized_df = pd.DataFrame(optimized_unscaled, columns=processed_initial_data.columns)
    
    # Encoding 된 항목 복원 (예: 'raw_material')
    encoded_columns = encoder.get_feature_names_out(['raw_material'])
    encoded_values = optimized_df[encoded_columns].values
    decoded_value = encoder.inverse_transform(encoded_values)[0]  # 원래 범주형 값 복원
    optimized_df = optimized_df.drop(columns=encoded_columns)  # 인코딩된 열 제거
    optimized_df['raw_material'] = decoded_value  # 원래 범주형 값 추가
    
    print("최적화 성공! 목표 y 값에 가장 가까운 X (스케일링 전): ")
    display(optimized_df)
    # print("해당 x 인자로 예측한 값: ", model.predict(optimized_scaled))
    print("해당 x 인자로 예측한 Delithiation capacity: ", model.predict(optimized_scaled)[0][0].round(1), "mAh/g")
    print("해당 x 인자로 예측한 FCE: ", model.predict(optimized_scaled)[0][1].round(1), "%")
else:
    print("최적화 실패: ", result.message)
    print("최적화 시도 중의 마지막 X 값: ", result.x)

최적화 성공! 목표 y 값에 가장 가까운 X (스케일링 전): 


Unnamed: 0,RM_PSA_D50,PS_Temp,PS_Ratio,p-Si_pore_volume,p-Si_pore_size,p-Si_domain_size,p-Si_Oxygen,C_condition,Carbon,c-Oxygen,c-Si_domain_size,c-Surface_area,FCETemp,raw_material
0,9.358585,848.168329,0.950752,0.875013,3.216672,2.128926,1.062308,421.527718,41.828418,1.384615,2.416963,6.747692,25.0,DS8


해당 x 인자로 예측한 Delithiation capacity:  1800.0 mAh/g
해당 x 인자로 예측한 FCE:  85.7 %


#### 5. 딥러닝

In [None]:
# 80%: train + validation, 20%: test
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2)

# train과 validation으로 다시 분할 (75%: train, 25%: validation)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25)

# 분할된 데이터의 크기 확인
print(f"Train set: {X_train.shape, y_train.shape}, Validation set: {X_val.shape, y_val.shape}, Test set: {X_test.shape, y_test.shape}")

In [None]:
# 데이터 정규화 (표준화)
scaler_X = StandardScaler()
X_train = scaler_X.fit_transform(X_train)   # Train 데이터에 대해 fitting하고 변환
X_val = scaler_X.transform(X_val)   # Validation과 Test 데이터는 fitting된 scaler로 변환만 수행
X_test = scaler_X.transform(X_test)

# y에 대한 표준화
scaler_y = StandardScaler()
y_train = scaler_y.fit_transform(y_train)
y_val = scaler_y.transform(y_val)
y_test = scaler_y.transform(y_test)

In [None]:
# 학습 모델 설계를 위한 설정 값

input_dim = len(X_train[0])
output_dim = 2

# 오차를 구하는 함수
# 회귀
loss_function = 'mean_squared_error'
# 2진 분류
# loss_function = 'binary_crossentropy'
# 다중 분류
# loss_function = 'categorical_crossentropy'

In [None]:
# 신경망 설계 함수
def add_dense_block(model, units, alpha=0.01):
    model.add(Dense(units))
    model.add(LeakyReLU(alpha=alpha))
    model.add(BatchNormalization())

In [None]:
# 신경망을 설계한다.

# 강사님의 신경망 설계 방법!
# 일반적인 방법은 없지만 강사님이 사용하는 방법임
# 국제적으로 인정받은 방식은 아니지만 프로젝트 실행하면서 이렇게 하니 결과가 좋았던 적이 많았음
# 첫번째 은닉층의 노드의 개수를 입력 데이터 컬럼의 개수의 1.2배로 설정
# 두번째부터는 이전 은닉층의 노드의 개수 80% 수준으로 설정
# 출력층의 노드의 개수보다 적어지지 않도록 설정
# 예) 컬럼 10개면 첫번째 은닉층 노드 : 12개 -> 두번째는 12x0.8개 = 9.6개 (10개) -> 세번째는 10x0.8개 = 8개 -> ... 출력층 1개일 때까지


# 신경망의 구조를 관리하는 객체
# 여기에 추가한 레이어 순서대로 데이터가 통과한다.
model = Sequential()

model.add(Dense(20, input_dim = input_dim, activation = 'relu'))
model.add(BatchNormalization())

add_dense_block(model, 16)
add_dense_block(model, 12)
add_dense_block(model, 10)
add_dense_block(model, 8)
add_dense_block(model, 6)
          
# 출력층
# 출력층은 최종 결과를 예측하는 계산하는 부분
# 노드의 개수는 예측하고자 하는 데이터의 가지수로 설정해줌
# 선형 회귀 레이어
model.add(Dense(output_dim))
# 회귀는 출력층의 활성화 함수를 두지 않음

In [None]:
# 사용할 경사 하강법
optimzier = Adam(learning_rate = 0.001)

# 학습모델 컴파일
model.compile(loss = loss_function, optimizer = optimzier) # metrics는 회귀일때는 설정 안하고 분류일 때 accuracy 설정
model.summary()

In [None]:
# 자동 저장 callback
path = '241122.keras'
callback1 = ModelCheckpoint(filepath=path, monitor='val_loss', save_best_only=True)

In [None]:
class OriginalScaleLoss(Callback):
    def __init__(self, X_train, y_train_scaled, X_val, y_val_scaled, scaler_y):
        super().__init__()
        self.X_train = X_train
        self.y_train_scaled = y_train_scaled
        self.X_val = X_val
        self.y_val_scaled = y_val_scaled
        self.scaler_y = scaler_y
        self.train_rmse_per_output = []
        self.val_rmse_per_output = []

    def on_epoch_end(self, epoch, logs=None):
        # 훈련 데이터 예측
        y_train_pred_scaled = self.model.predict(self.X_train, verbose=0)
        y_train_pred = self.scaler_y.inverse_transform(y_train_pred_scaled)
        y_train_true = self.scaler_y.inverse_transform(self.y_train_scaled)

        # 검증 데이터 예측
        y_val_pred_scaled = self.model.predict(self.X_val, verbose=0)
        y_val_pred = self.scaler_y.inverse_transform(y_val_pred_scaled)
        y_val_true = self.scaler_y.inverse_transform(self.y_val_scaled)

        # 출력 차원별 RMSE 계산
        train_rmse_per_output = np.sqrt(np.mean((y_train_true - y_train_pred) ** 2, axis=0))
        val_rmse_per_output = np.sqrt(np.mean((y_val_true - y_val_pred) ** 2, axis=0))

        # 손실 값 저장
        self.train_rmse_per_output.append(train_rmse_per_output)
        self.val_rmse_per_output.append(val_rmse_per_output)

        # 출력 차원별로 RMSE 출력
        for idx, (train_rmse, val_rmse) in enumerate(zip(train_rmse_per_output, val_rmse_per_output)):
            print(f"Epoch {epoch+1} - 출력 차원 {idx+1}: Train RMSE = {train_rmse:.4f}, Val RMSE = {val_rmse:.4f}")

# 콜백 인스턴스 생성
original_scale_loss = OriginalScaleLoss(
    X_train=X_train,            # 학습 데이터 입력
    y_train_scaled=y_train,     # 학습 데이터 출력 (스케일된 값)
    X_val=X_test,               # 검증 데이터 입력
    y_val_scaled=y_test,        # 검증 데이터 출력 (스케일된 값)
    scaler_y=scaler_y        # 출력값을 스케일링한 스케일러
)

In [None]:
# 조기 중단
callback2 = EarlyStopping(monitor='val_loss', patience=100)
callback3 = original_scale_loss

# 학습을 한다.
history = model.fit(X_train, y_train, validation_data=(X_val, y_val),
                    epochs=9999999999, batch_size=128, callbacks=[callback1, callback2, callback3])

# batch_size보다 데이터의 수가 작으면 데이터의 수만큼만 올라가서 상관없음
# 하지만 gpu를 많이 잡아먹음. gpu 여유있으면 batch size 크게 잡아주는 것이 좋음
# val_loss가 떨어지지 않으면 layer 수가 부족한 것이니 layer를 늘려주면 됨

In [None]:
# 오차에 관한 그래프
plt.plot(history.history['loss'][:-10], label='Train')
plt.plot(history.history['val_loss'][:-10], label='Validation')
# plt.ylim(0, 5)   # train loss 그래프가 구불부굴함 -> 학습 덜 된 것임 -> batch size 늘려서 완만해지게 만들어야 함
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend()
plt.show()

# 그래프가 구불구불한 것은 batch size 늘리면 나아짐 -> batch size 가 작으면 학습이 불안정해짐. 데이터 양이 많으면 한꺼번에 메모리에 올릴 수 없어서 쪼개서 올려주는 것임
# 선생님은 32 -> 50000으로 늘려서 진행함. 한번에 많이 늘리면 메모리 부족하다는 경고가 뜸. 천천히 늘려가면서 학습이 되면 더 늘려주면 됨
# train와 test loss 그래프가 맞닿아 있고 완만해 있으면 학습이 잘 되었다는 의미

In [None]:
# 5. Test set 평가
test_loss, test_mse = model.evaluate(X_test, y_test)
print(f"Test MSE: {test_mse}")

In [None]:
# 최종 평가를 수행한다.
# 저장한 학습 모델을 불러온다.
best_model = load_model('241122.keras')

# 데이터에 대한 예측값
y_train_pred = best_model.predict(X_train)
y_val_pred = best_model.predict(X_val)
y_test_pred = best_model.predict(X_test)

# 각 y값에 대한 MSE 계산 (학습 및 검증 데이터)
mse_train_list = []
mse_val_list = []
mse_test_list = []

for i, col in enumerate(y.columns):
    mse_train = mean_squared_error(y_train[:, i], y_train_pred[:, i])
    mse_val = mean_squared_error(y_test[:, i], y_val_pred[:, i])
    mse_test = mean_squared_error(y_test[:, i], y_test_pred[:, i])
    mse_train_list.append(round(mse_train, 2))
    mse_val_list.append(round(mse_val, 2))
    mse_test_list.append(round(mse_test, 2))

# 결과를 데이터프레임으로 출력
mse_df = pd.DataFrame({
    'Output': y.columns,
    'MSE_Train': mse_train_list,
    'MSE_Val': mse_val_list,
    'MSE_Test': mse_test_list
})

display(mse_df)

In [None]:
# 실제 값과 예측 값을 비교하는 그래프를 그리는 함수.

def plot_comparison(y_actual, y_pred, columns, scaler=None):
    # 스케일링 전 값으로 역변환
    if scaler is not None:
        y_actual = scaler.inverse_transform(y_actual)
        y_pred = scaler.inverse_transform(y_pred)
    
    for i, col in enumerate(columns):  # 열 이름을 가져옴
        # 서브플롯 설정 (1행 2열, 첫 번째 서브플롯은 라인 그래프, 두 번째는 산점도)
        fig, axes = plt.subplots(1, 2, figsize=(12, 4))

        # 왼쪽 서브플롯: 라인 그래프
        axes[0].plot(y_actual[:, i].reshape(-1), label=f'Actual {col}')
        axes[0].plot(y_pred[:, i].reshape(-1), label=f'Predict {col}', linestyle='--')
        axes[0].legend()
        axes[0].set_title(f'Actual vs Predicted (Line Plot) for {col}')

        # 오른쪽 서브플롯: 산점도 그래프
        axes[1].scatter(y_actual[:, i].reshape(-1), y_pred[:, i].reshape(-1))
        axes[1].plot([min(y_actual[:, i]), max(y_actual[:, i])], 
                     [min(y_actual[:, i]), max(y_actual[:, i])], 
                     color='red', linestyle='--')  # 대각선 (완벽한 예측선)
        axes[1].set_title(f'Actual vs Predicted (Scatter Plot) for {col}')
        axes[1].set_xlabel(f'Actual {col}')
        axes[1].set_ylabel(f'Predicted {col}')

        # 그래프 간격 조정
        plt.tight_layout()
        plt.subplots_adjust(wspace=0.4)  # 서브플롯 간의 가로 간격 조정 (wspace=0.4로 설정)
        plt.show()

In [None]:
# 예측 결과
y_pred = best_model.predict(X_train)
plot_comparison(y_train, y_pred, y.columns, scaler=scaler_y)

In [None]:
# 예측 결과
y_pred_val = best_model.predict(X_val)
plot_comparison(y_val, y_pred_val, y.columns, scaler=scaler_y)

In [None]:
# 학습이 완료된 모델을 저장해준다.
with open('best_DL_model_241122.pkl', 'wb') as fp:
    # 학습 모델 저장
    pickle.dump(best_model, fp)
    # 스케일러 저장
    pickle.dump(scaler_X, fp)
    pickle.dump(scaler_y, fp)
    # 인코더 저장
    pickle.dump(encoder, fp)

#### 5.1 딥러닝 예측

In [None]:
# 저장한 학습모델들을 복원한다.
with open('best_model_241122.pkl', 'rb') as fp:
    # 학습 모델 불러오기
    model = pickle.load(fp)
    # 스케일러 불러오기
    scaler1 = pickle.load(fp)
    scaler2 = pickle.load(fp)
    # 인코더 불러오기
    encoder1 = pickle.load(fp)

#### 5.1.1 값으로 예측

#### 5.1.2 csv파일로 예측

In [None]:
prediction = pd.read_csv('combinations_data.csv')
prediction.head(3)

In [None]:
prediction_encoded = encoder1.transform(prediction[['raw_material']])
# 결과를 pandas 데이터프레임으로 변환
prediction_encoded = pd.DataFrame(prediction_encoded, columns=encoder1.get_feature_names_out(['raw_material']))
prediction_encoded.head()

In [None]:
new_data = pd.concat([prediction.drop(columns=['raw_material']), prediction_encoded], axis=1)
new_data.head()

In [None]:
# 다중공선성 문제 해결을 위해 dummy column 1개 삭제
# 학습할 때와 동일하게 진행
new_data = new_data.drop(['raw_material_S5'], axis = 1)
new_data.head()

In [None]:
# 표준화
X = scaler1.transform(new_data)
X

In [None]:
# 예측
pred = model.predict(X)
pred

In [None]:
# 원래의 결과로 복원
result = scaler2.inverse_transform(pred)
result

In [None]:
# 예측 결과를 새로운 데이터프레임에 추가
# y_new_pred는 (샘플 수, 3) 크기의 배열이므로 이를 각각의 열로 분리하여 추가
prediction['Lithiation Capacity (예측값)'] = result[:, 0].astype(int)  # 첫 번째 출력값을 정수로 변환
prediction['Delithiation Capacity (예측값)'] = result[:, 1].astype(int)  # 두 번째 출력값을 정수로 변환
prediction['FCE (예측값)'] = result[:, 2].round(1)  # 세 번째 출력값은 소수점 첫째짜리 까지 반환
prediction.head(3)

In [None]:
# DeliCap을 기준으로 내림차순 정렬하고, 값이 동일하면 FCE를 기준으로 정렬
prediction_sorted = prediction.sort_values(by=['Delithiation Capacity (예측값)', 'FCE (예측값)'], ascending=[False, False])

# 결과를 csv 파일로 저장
prediction_sorted.to_csv('Taurus_Prediction_Result_241014.csv', encoding='utf-8-sig', index=False)

print("예측이 완료되었습니다. 결과는 'Taurus_Prediction_Result_241122.csv'에 저장되었습니다.")