In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.ensemble import StackingClassifier
from xgboost import XGBClassifier
import warnings
import matplotlib.pyplot as plt
warnings.filterwarnings('ignore')

import scipy.stats

from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from sklearn.neural_network import MLPClassifier

In [None]:
class DataPreprocessor:
    def __init__(self, file_path):
        self.data = pd.read_csv(file_path, encoding='cp949', index_col=0)

    def encode_working(self):
        self.data['working_encoded'] = self.data['working'].map({'가동': 1, '정지': 0})

    def preprocess_datetime(self):
        self.data['datetime'] = pd.to_datetime(self.data['date'] + ' ' + self.data['time'])
        self.data['datetime_int'] = self.data['datetime'].astype(np.int64) // 10**9

    def remove_unnecessary_columns(self):
        columns_to_drop = ['count', 'EMS_operation_time', 'mold_code']
        self.data = self.data.drop(columns_to_drop, axis=1, errors='ignore')
        self.data = self.data.select_dtypes(include=[np.number])

    def remove_missing_values(self):
        if 'molten_volume' in self.data.columns:
            self.data.drop('molten_volume', axis=1, inplace=True)
        self.data.dropna(axis=0, inplace=True)
        self.data.reset_index(drop=True, inplace=True)

    def remove_outliers(self, percentile_range=(0.1, 99.9)):
        for col in self.data.select_dtypes(include=np.number).columns:
            if col != 'passorfail':
                lower = np.percentile(self.data[col], percentile_range[0])
                upper = np.percentile(self.data[col], percentile_range[1])
                self.data = self.data[(self.data[col] >= lower) & (self.data[col] <= upper)]
        self.data.reset_index(drop=True, inplace=True)

    def perform_feature_selection(self):
        t_test_results = []
        for col in self.data.columns:
            if col != 'passorfail':
                t_stat, p_value = scipy.stats.ttest_ind(
                    self.data[self.data['passorfail'] == 1][col],
                    self.data[self.data['passorfail'] == 0][col],
                    equal_var=False
                )
                if p_value < 0.05:
                    t_test_results.append(col)
        t_test_results.append('passorfail')
        self.data = self.data[t_test_results]

    def get_processed_data(self):
        return self.data


In [None]:
class EnsembleStacker1:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        self.scaler = StandardScaler()
        self.X_train_scaled = self.scaler.fit_transform(self.X_train)
        self.X_test_scaled = self.scaler.transform(self.X_test)

        # 1층의 KNN 제거
        self.factory_deterministic_candidates = ["Logistic Regression", "GaussianNB"]
        self.factory_complex_candidates = ["Extra Trees", "KNN"]

    def create_model(self, model_name):
        model_dict = {
            "Logistic Regression": LogisticRegression(),
            "GaussianNB": GaussianNB(),
            "KNN": KNeighborsClassifier(),
            "Extra Trees": ExtraTreesClassifier()
        }
        return model_dict[model_name]

    def evaluate_individual_model(self, model, X_train, X_test, y_train, y_test, model_name, layer_name):
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else y_pred

        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        roc_auc = roc_auc_score(y_test, y_pred_proba) if hasattr(model, 'predict_proba') else None
        fpr = confusion_matrix(y_test, y_pred)[0][1] / sum(confusion_matrix(y_test, y_pred)[0])

        return {
            'Model': model_name,
            'Accuracy': accuracy,
            'Precision': precision,
            'Recall': recall,
            'F1 Score': f1,
            'ROC AUC': roc_auc,
            'FPR': fpr
        }

    def run_all_combinations(self):
        stage_results = []
        stage_number = 1

        for fd_model in self.factory_deterministic_candidates:
            for fc_model in self.factory_complex_candidates:
                print(f"\n=== Stage {stage_number} ===")

                # 각 층의 모델 초기화
                layer_configs = {
                    "Factory Deterministic": fd_model,
                    "Factory Complex": fc_model,
                }

                # 각 층의 모델을 평가
                layer_results = {}
                estimators = []
                for layer_name, model_name in layer_configs.items():
                    model = self.create_model(model_name)
                    metrics = self.evaluate_individual_model(
                        model, self.X_train_scaled, self.X_test_scaled,
                        self.y_train, self.y_test, model_name, layer_name
                    )
                    layer_results[f"{layer_name} Model"] = model_name
                    layer_results[f"{layer_name} Accuracy"] = metrics['Accuracy']
                    layer_results[f"{layer_name} Precision"] = metrics['Precision']
                    layer_results[f"{layer_name} Recall"] = metrics['Recall']
                    layer_results[f"{layer_name} F1 Score"] = metrics['F1 Score']
                    layer_results[f"{layer_name} ROC AUC"] = metrics['ROC AUC']
                    layer_results[f"{layer_name} FPR"] = metrics['FPR']
                    estimators.append((model_name, model))

                # 스태킹을 위한 최종 메타 모델
                stacking_clf = StackingClassifier(
                    estimators=estimators,
                    final_estimator=XGBClassifier(),
                    cv=5
                )

                # 최종 메타 모델 평가
                stacking_clf.fit(self.X_train_scaled, self.y_train)
                y_pred = stacking_clf.predict(self.X_test_scaled)
                y_pred_proba = stacking_clf.predict_proba(self.X_test_scaled)[:, 1]

                # 스태킹 모델 성능 결과 저장
                layer_results['Stage'] = stage_number
                layer_results['Stacked Model Accuracy'] = accuracy_score(self.y_test, y_pred)
                layer_results['Stacked Model Precision'] = precision_score(self.y_test, y_pred)
                layer_results['Stacked Model Recall'] = recall_score(self.y_test, y_pred)
                layer_results['Stacked Model F1 Score'] = f1_score(self.y_test, y_pred)
                layer_results['Stacked Model ROC AUC'] = roc_auc_score(self.y_test, y_pred_proba)
                layer_results['Stacked Model FPR'] = confusion_matrix(self.y_test, y_pred)[0][1] / sum(confusion_matrix(self.y_test, y_pred)[0])

                print("\n--- Stacked Model Performance ---")
                print(f"Accuracy: {layer_results['Stacked Model Accuracy']:.4f}, Precision: {layer_results['Stacked Model Precision']:.4f}, Recall: {layer_results['Stacked Model Recall']:.4f}, F1 Score: {layer_results['Stacked Model F1 Score']:.4f}, ROC AUC: {layer_results['Stacked Model ROC AUC']:.4f}, FPR: {layer_results['Stacked Model FPR']:.4f}")

                # 결과 저장
                stage_results.append(layer_results)
                stage_number += 1

        # 결과를 데이터프레임으로 변환하여 반환
        results_df = pd.DataFrame(stage_results)
        results_df.columns = [
            'Stage', 'Layer 1 Model (Factory Deterministic)', 'Layer 1 Accuracy', 'Layer 1 Precision', 'Layer 1 Recall', 'Layer 1 F1 Score',
            'Layer 1 ROC AUC', 'Layer 1 FPR', 'Layer 2 Model (Factory Complex)', 'Layer 2 Accuracy', 'Layer 2 Precision',
            'Layer 2 Recall', 'Layer 2 F1 Score', 'Layer 2 ROC AUC', 'Layer 2 FPR', 'Stacked Model Accuracy',
            'Stacked Model Precision', 'Stacked Model Recall', 'Stacked Model F1 Score', 'Stacked Model ROC AUC', 'Stacked Model FPR'
        ]

        return results_df

In [None]:
class EnsembleStacker2:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        self.scaler = StandardScaler()
        self.X_train_scaled = self.scaler.fit_transform(self.X_train)
        self.X_test_scaled = self.scaler.transform(self.X_test)

        # 3, 4층의 후보 모델만 사용
        self.command_deterministic_candidates = ["Random Forest", "XGBoost", "AdaBoost"]
        self.command_complex_candidates = ["LightGBM", "CatBoost", "GBM", "MLP"]

    def create_model(self, model_name):
        model_dict = {
            "Random Forest": RandomForestClassifier(),
            "XGBoost": XGBClassifier(),
            "AdaBoost": AdaBoostClassifier(),
            "LightGBM": LGBMClassifier(),
            "CatBoost": CatBoostClassifier(verbose=0),
            "GBM": LGBMClassifier(),
            "MLP": MLPClassifier()
        }
        return model_dict[model_name]

    def evaluate_individual_model(self, model, X_train, X_test, y_train, y_test, model_name, layer_name):
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else y_pred

        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        roc_auc = roc_auc_score(y_test, y_pred_proba) if hasattr(model, 'predict_proba') else None
        fpr = confusion_matrix(y_test, y_pred)[0][1] / sum(confusion_matrix(y_test, y_pred)[0])

        return {
            'Model': model_name,
            'Accuracy': accuracy,
            'Precision': precision,
            'Recall': recall,
            'F1 Score': f1,
            'ROC AUC': roc_auc,
            'FPR': fpr
        }

    def run_all_combinations(self):
        stage_results = []
        stage_number = 1

        for cd_model in self.command_deterministic_candidates:
            for cc_model in self.command_complex_candidates:
                print(f"\n=== Stage {stage_number} ===")

                # 각 층의 모델 초기화
                layer_configs = {
                    "Command Deterministic": cd_model,
                    "Command Complex": cc_model,
                }

                # 각 층의 모델을 평가
                layer_results = {}
                estimators = []
                for layer_name, model_name in layer_configs.items():
                    model = self.create_model(model_name)
                    metrics = self.evaluate_individual_model(
                        model, self.X_train_scaled, self.X_test_scaled,
                        self.y_train, self.y_test, model_name, layer_name
                    )
                    layer_results[f"{layer_name} Model"] = model_name
                    layer_results[f"{layer_name} Accuracy"] = metrics['Accuracy']
                    layer_results[f"{layer_name} Precision"] = metrics['Precision']
                    layer_results[f"{layer_name} Recall"] = metrics['Recall']
                    layer_results[f"{layer_name} F1 Score"] = metrics['F1 Score']
                    layer_results[f"{layer_name} ROC AUC"] = metrics['ROC AUC']
                    layer_results[f"{layer_name} FPR"] = metrics['FPR']
                    estimators.append((model_name, model))

                # 스태킹을 위한 최종 메타 모델
                stacking_clf = StackingClassifier(
                    estimators=estimators,
                    final_estimator=XGBClassifier(),
                    cv=5
                )

                # 최종 메타 모델 평가
                stacking_clf.fit(self.X_train_scaled, self.y_train)
                y_pred = stacking_clf.predict(self.X_test_scaled)
                y_pred_proba = stacking_clf.predict_proba(self.X_test_scaled)[:, 1]

                # 스태킹 모델 성능 결과 저장
                layer_results['Stage'] = stage_number
                layer_results['Stacked Model Accuracy'] = accuracy_score(self.y_test, y_pred)
                layer_results['Stacked Model Precision'] = precision_score(self.y_test, y_pred)
                layer_results['Stacked Model Recall'] = recall_score(self.y_test, y_pred)
                layer_results['Stacked Model F1 Score'] = f1_score(self.y_test, y_pred)
                layer_results['Stacked Model ROC AUC'] = roc_auc_score(self.y_test, y_pred_proba)
                layer_results['Stacked Model FPR'] = confusion_matrix(self.y_test, y_pred)[0][1] / sum(confusion_matrix(self.y_test, y_pred)[0])

                print("\n--- Stacked Model Performance ---")
                print(f"Accuracy: {layer_results['Stacked Model Accuracy']:.4f}, Precision: {layer_results['Stacked Model Precision']:.4f}, Recall: {layer_results['Stacked Model Recall']:.4f}, F1 Score: {layer_results['Stacked Model F1 Score']:.4f}, ROC AUC: {layer_results['Stacked Model ROC AUC']:.4f}, FPR: {layer_results['Stacked Model FPR']:.4f}")

                # 결과 저장
                stage_results.append(layer_results)
                stage_number += 1

        # 결과를 데이터프레임으로 변환하여 반환
        results_df = pd.DataFrame(stage_results)
        results_df.columns = [
            'Stage', 'Layer 3 Model (Command Deterministic)', 'Layer 3 Accuracy', 'Layer 3 Precision', 'Layer 3 Recall', 'Layer 3 F1 Score',
            'Layer 3 ROC AUC', 'Layer 3 FPR', 'Layer 4 Model (Command Complex)', 'Layer 4 Accuracy', 'Layer 4 Precision',
            'Layer 4 Recall', 'Layer 4 F1 Score', 'Layer 4 ROC AUC', 'Layer 4 FPR', 'Stacked Model Accuracy',
            'Stacked Model Precision', 'Stacked Model Recall', 'Stacked Model F1 Score', 'Stacked Model ROC AUC', 'Stacked Model FPR'
        ]

        return results_df

In [None]:
class EnsembleStacker3:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        self.scaler = StandardScaler()
        self.X_train_scaled = self.scaler.fit_transform(self.X_train)
        self.X_test_scaled = self.scaler.transform(self.X_test)

        # 각 층의 후보 모델
        self.factory_deterministic_candidates = ["Logistic Regression", "GaussianNB", "KNN"]
        self.factory_complex_candidates = ["Extra Trees", "KNN"]
        self.command_deterministic_candidates = ["Random Forest", "XGBoost", "AdaBoost"]
        self.command_complex_candidates = ["LightGBM", "CatBoost", "GBM", "MLP"]

    def create_model(self, model_name):
        model_dict = {
            "Logistic Regression": LogisticRegression(),
            "GaussianNB": GaussianNB(),
            "KNN": KNeighborsClassifier(),
            "Extra Trees": ExtraTreesClassifier(),
            "Random Forest": RandomForestClassifier(),
            "XGBoost": XGBClassifier(),
            "AdaBoost": AdaBoostClassifier(),
            "LightGBM": LGBMClassifier(),
            "CatBoost": CatBoostClassifier(verbose=0),
            "GBM": LGBMClassifier(),
            "MLP": MLPClassifier()
        }
        return model_dict[model_name]

    def evaluate_individual_model(self, model, X_train, X_test, y_train, y_test, model_name, layer_name):
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else y_pred

        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        roc_auc = roc_auc_score(y_test, y_pred_proba) if hasattr(model, 'predict_proba') else None
        fpr = confusion_matrix(y_test, y_pred)[0][1] / sum(confusion_matrix(y_test, y_pred)[0])

        print(f"Layer: {layer_name}, Model: {model_name}")
        if roc_auc is not None:
            print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}, ROC AUC: {roc_auc:.4f}, FPR: {fpr:.4f}")
        else:
            print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}, ROC AUC: N/A, FPR: {fpr:.4f}")

        return {
            'Layer': layer_name,
            'Model': model_name,
            'Accuracy': accuracy,
            'Precision': precision,
            'Recall': recall,
            'F1 Score': f1,
            'ROC AUC': roc_auc,
            'FPR': fpr
        }

    def run_all_combinations(self):
        stage_results = []
        stage_number = 1

        for fd_model in self.factory_deterministic_candidates:
            for fc_model in self.factory_complex_candidates:
                for cd_model in self.command_deterministic_candidates:
                    for cc_model in self.command_complex_candidates:
                        print(f"\n=== Stage {stage_number} ===")

                        # 각 층의 모델 초기화
                        layer_configs = {
                            "Factory Deterministic": fd_model,
                            "Factory Complex": fc_model,
                            "Command Deterministic": cd_model,
                            "Command Complex": cc_model
                        }

                        # 각 층의 모델을 평가
                        estimators = []
                        for layer_name, model_name in layer_configs.items():
                            model = self.create_model(model_name)
                            metrics = self.evaluate_individual_model(
                                model, self.X_train_scaled, self.X_test_scaled,
                                self.y_train, self.y_test, model_name, layer_name
                            )
                            estimators.append((model_name, model))

                        # 스태킹을 위한 최종 메타 모델
                        stacking_clf = StackingClassifier(
                            estimators=estimators,
                            final_estimator=XGBClassifier(),
                            cv=5
                        )

                        # 최종 메타 모델 평가
                        stacking_clf.fit(self.X_train_scaled, self.y_train)
                        y_pred = stacking_clf.predict(self.X_test_scaled)
                        y_pred_proba = stacking_clf.predict_proba(self.X_test_scaled)[:, 1]

                        accuracy = accuracy_score(self.y_test, y_pred)
                        precision = precision_score(self.y_test, y_pred)
                        recall = recall_score(self.y_test, y_pred)
                        f1 = f1_score(self.y_test, y_pred)
                        roc_auc = roc_auc_score(self.y_test, y_pred_proba)
                        fpr = confusion_matrix(self.y_test, y_pred)[0][1] / sum(confusion_matrix(self.y_test, y_pred)[0])

                        print("\n--- Stacked Model Performance ---")
                        print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}, ROC AUC: {roc_auc:.4f}, FPR: {fpr:.4f}")

                        # 결과 저장
                        stage_results.append({
                            'Stage': stage_number,
                            'Factory Deterministic': fd_model,
                            'Factory Complex': fc_model,
                            'Command Deterministic': cd_model,
                            'Command Complex': cc_model,
                            'Stacked Model': 'XGBoost',
                            'Accuracy': accuracy,
                            'Precision': precision,
                            'Recall': recall,
                            'F1 Score': f1,
                            'ROC AUC': roc_auc,
                            'FPR': fpr
                        })

                        stage_number += 1

        return pd.DataFrame(stage_results)

In [None]:
def plot_stacked_model_performance(results_df):
    # 각 성능 지표를 비교할 수 있도록 막대그래프 생성
    metrics = ['Stacked Model Accuracy', 'Stacked Model Precision', 'Stacked Model Recall',
               'Stacked Model F1 Score', 'Stacked Model ROC AUC']

    # 각 조합(스테이지)별로 모든 성능 지표를 비교하는 그래프
    plt.figure(figsize=(12, 8))

    # 막대의 위치와 너비 설정
    stages = results_df['Stage']
    bar_width = 0.15
    index = np.arange(len(stages))

    # 각 성능 지표에 대한 막대 그래프
    for i, metric in enumerate(metrics):
        plt.bar(index + i * bar_width, results_df[metric], bar_width, label=metric)

    plt.xlabel('Stage (Model Combination)')
    plt.ylabel('Score')
    plt.title('Comparison of Stacked Model Performance by Metric')
    plt.xticks(index + bar_width * (len(metrics) - 1) / 2, stages)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
if __name__ == "__main__":
    preprocessor = DataPreprocessor('./data/경진대회용 주조 공정최적화 데이터셋.csv')
    preprocessor.encode_working()
    preprocessor.preprocess_datetime()
    preprocessor.remove_unnecessary_columns()
    preprocessor.remove_missing_values()
    preprocessor.remove_outliers()
    preprocessor.perform_feature_selection()
    data = preprocessor.get_processed_data()

    X = data.drop('passorfail', axis=1)
    y = data['passorfail']

    stacker1 = EnsembleStacker1(X, y)
    stacker2 = EnsembleStacker2(X, y)
    stacker3 = EnsembleStacker3(X, y)

    results_df1 = stacker1.run_all_combinations()
    print("\n전체 조합 성능 비교:")
    print(results_df1)

    # 스태킹 모델 성능 시각화
    plot_stacked_model_performance(results_df1)


    results_df2 = stacker2.run_all_combinations()
    print("\n전체 조합 성능 비교:")
    print(results_df2)

    # 스태킹 모델 성능 시각화
    plot_stacked_model_performance(results_df2)

    results_df3 = stacker3.run_all_combinations()
    print("\n전체 조합 성능 비교:")
    print(results_df3)

    # 스태킹 모델 성능 시각화
    plot_stacked_model_performance(results_df3)

In [None]:
results_df1.to_csv("./result/results1.csv", index=False, encoding="utf-8-sig")
results_df2.to_csv("./result/results2.csv", index=False, encoding="utf-8-sig")
results_df3.to_csv("./result/results3.csv", index=False, encoding="utf-8-sig")