In [3]:
# 기본 라이브러리
import os
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

# 전처리 및 평가
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import (
    f1_score,
    precision_recall_fscore_support as sk,
    accuracy_score
)
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer

# 모델 관련
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import GradientBoostingClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from imblearn.over_sampling import SMOTE

# PyTorch 관련
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

# Keras/TensorFlow 관련
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras import metrics

# TabNet
from pytorch_tabnet.tab_model import TabNetClassifier

# AutoML
import optuna

# 기타
import pickle

# 주피터 환경 설정 (주피터에서만 유효)
%matplotlib inline

In [4]:
os.chdir("..")
path=os.getcwd()

In [2]:
Synthetic_model=['smote','adasyn','copulagan','ctgan','nbsynthetic']

In [None]:
for name in Synthetic_model:
   # 학습 데이터 불러오기
    X_train = np.loadtxt(path+'\\Synthetic_data\\train_valid_test\\세명초_Nbsynthetic_X_train.txt', delimiter=',')
    y_train = np.loadtxt(path+'\\Synthetic_data\\train_valid_test\\세명초_Nbsynthetic_y_train.txt', delimiter=',')

    # 검증 데이터 불러오기
    X_valid = np.loadtxt(path+'\\Synthetic_data\\train_valid_test\\세명초_Nbsynthetic_X_valid.txt', delimiter=',')
    y_valid = np.loadtxt(path+'\\Synthetic_data\\train_valid_test\\세명초_Nbsynthetic_y_valid.txt', delimiter=',')

    # 테스트 데이터 불러오기
    X_test = np.loadtxt(path+'\\Synthetic_data\\train_valid_test\\세명초_Nbsynthetic_X_test.txt', delimiter=',')
    y_test = np.loadtxt(path+'\\Synthetic_data\\train_valid_test\\세명초_Nbsynthetic_y_test.txt', delimiter=',')

In [None]:
import pandas as pd

# 빈 리스트 생성하여 결과 저장
model_results_list = []

for generation in Generation_list:
    
    # 학습 데이터 불러오기

    X_train = eval(generation+'_X_train')
    y_train = eval(generation+'_y_train')

    # 검증 데이터 불러오기
    X_valid = eval(generation+'_X_valid')
    y_valid = eval(generation+'_y_valid')

    # 테스트 데이터 불러오기
    X_test = eval(generation+'_X_test')
    y_test = eval(generation+'_y_test')
        
    y_train=y_train-1
    y_valid=y_valid-1
    y_test=y_test-1
    
    
    device=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    
    tabnet_params = {
    "n_d": 8,
    "n_a": 8,
    "n_steps": 5,
    "gamma": 1.3,
    "cat_idxs": [],
    "cat_dims": [],
    "cat_emb_dim": [],
    "n_independent": 2,
    "n_shared": 2,
    "epsilon": 1e-15,
    "momentum": 0.02,
    "lambda_sparse": 0.001,
    "seed": 0,
    "clip_value": 1,
    "verbose": 1,
    "optimizer_fn": torch.optim.Adam,
    "optimizer_params": {'lr': 0.02},
    "scheduler_fn": None,
    "scheduler_params": {},
    "mask_type": 'sparsemax',
    "input_dim": 6,
    "output_dim": [3],
    "device_name": 'auto',
    "n_shared_decoder": 1,
    "n_indep_decoder": 1,
    "grouped_features": []
    }
    tabnet_clf = TabNetClassifier(**tabnet_params)
    max_epochs=300
    tabnet_clf.fit(
    X_train=X_train, y_train=y_train,
    eval_set=[(X_valid, y_valid)],
    max_epochs=max_epochs ,
    patience=200, # please be patient ^^
    batch_size=5000,
    virtual_batch_size=5000,
    num_workers=1,
    drop_last=False,
)
        
    prediction=tabnet_clf.predict(X_test)
    test_targets_np=y_test
    with open(f'C:\\jupyter\\Elementary-student-Weight\\GAN_XAI\\Experiment3(생성별_모델별)\\model_history\\{generation}_proposed_tabnetEmbedd.pickle', 'wb') as f:
        pickle.dump(tabnet_clf, f)
    
    X_train_torch = torch.from_numpy(X_train.astype(np.float32))
    X_test_torch  = torch.from_numpy(X_test.astype(np.float32))
    
    def extract_tabnet_embeddings(
    tabnet_model,
    X_tensor,
    device=None,
    batch_size=5000
    ):
        if device is None:
            device = next(tabnet_model.network.parameters()).device
        
        tabnet_model.network.eval()
        tabnet_model.network.to(device)
        X_tensor = X_tensor.to(device)
        
        dataset = TensorDataset(X_tensor)
        loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
        
        all_embeddings = []
        
        with torch.no_grad():
            for (xb,) in loader:
                embeddings, _ = tabnet_model.network.forward_masks(xb)
                        
                if isinstance(embeddings, dict):
                    if 'embeddings' in embeddings:
                        final_emb = embeddings['embeddings'][-1]
                    else:
                        final_emb = list(embeddings.values())[-1]
                else:
                    # embeddings가 리스트인 경우, 마지막 decision step의 embedding 사용
                    if isinstance(embeddings, list):
                        final_emb = embeddings[-1]
                    else:
                        final_emb = embeddings
                
                # final_emb의 shape 확인
                print("Shape of final_emb:", final_emb.shape)
                
                all_embeddings.append(final_emb.cpu().numpy())
        
        final_embeddings = np.concatenate(all_embeddings, axis=0)
        # 최종 shape 출력
        print("Final embeddings shape:", final_embeddings.shape)
        return final_embeddings
    
    
    # 사용 예시
    X_train_feature_attrs = extract_tabnet_embeddings(
        tabnet_clf,
        X_train_torch,
        device='cuda' if torch.cuda.is_available() else 'cpu'
    )

    X_test_feature_attrs = extract_tabnet_embeddings(
        tabnet_clf,
        X_test_torch,
        device='cuda' if torch.cuda.is_available() else 'cpu'
    )
    
    from sklearn.preprocessing import StandardScaler
    # StandardScaler 객체를 생성합니다.
    standard_scaler = StandardScaler()

    # fit_transform()을 사용해서 학습과 스케일링을 한 번에 적용합니다.
    X_train_feature_attrs_standard = standard_scaler.fit_transform(X_train_feature_attrs)
    # 표준화가 완료된 데이터를 데이터프레임 형태로 변환합니다.
    X_test_feature_attrs_standard = standard_scaler.transform(X_test_feature_attrs)

    with open(f'C:\\jupyter\\Elementary-student-Weight\\GAN_XAI\\Experiment3(생성별_모델별)\\model_history\\{generation}_proposed_standard_scaler.pickle', 'wb') as f:
        pickle.dump(standard_scaler, f)

    def objective_xgboost(trial, X_train, X_test, y_train, y_test):
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 50, 300),
            'max_depth': trial.suggest_int('max_depth', 3, 8),
            'learning_rate': trial.suggest_loguniform('learning_rate', 0.01, 0.3),
            'gamma': trial.suggest_loguniform('gamma', 1e-8, 1.0),
            'min_child_weight': trial.suggest_int('min_child_weight', 1, 7),
            'subsample': trial.suggest_uniform('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_uniform('colsample_bytree', 0.6, 1.0),
            'reg_lambda': trial.suggest_loguniform('reg_lambda', 1e-8, 10.0),
            'reg_alpha': trial.suggest_loguniform('reg_alpha', 1e-8, 10.0),
            'random_state': 42
        }
        
        model = XGBClassifier(**params)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        return accuracy_score(y_test, y_pred)

    # 모델 최적화 및 비교
    def optimize_and_compare_models(X_train, X_test, y_train, y_test, n_trials=100):
        results = {}
        
        # XGBoost 최적화
        study_xgb = optuna.create_study(direction='maximize')
        study_xgb.optimize(lambda trial: objective_xgboost(trial, X_train, X_test, y_train, y_test), 
                        n_trials=n_trials)

        
        # 최적의 모델 생성 및 결과 저장
        # XGBoost
        best_xgb = XGBClassifier(**study_xgb.best_params, random_state=42)
        best_xgb.fit(X_train, y_train)
        xgb_pred = best_xgb.predict(X_test)
        results['XGBoost'] = {
            'accuracy': accuracy_score(y_test, xgb_pred),
            'best_params': study_xgb.best_params
        }
        return results,xgb_pred,best_xgb

    # 모델 실행 및 결과 출력
    results ,xgb_pred,best_xgb= optimize_and_compare_models(X_train_feature_attrs_standard, X_test_feature_attrs_standard, 
                                        y_train.astype(int), y_test.astype(int), n_trials=100)
    
    
    # 결과 출력
    for model_name, model_results in results.items():
        print(f"\n{model_name} Results:")
        print(f"Best Accuracy: {model_results['accuracy']:.4f}")
        print("Best Parameters:")
        for param, value in model_results['best_params'].items():
            print(f"  {param}: {value}")

    
    with open(f'C:\\jupyter\\Elementary-student-Weight\\GAN_XAI\\Experiment3(생성별_모델별)\\model_history\\{generation}_proposed.pickle', 'wb') as f:
        pickle.dump(best_xgb, f)
    
    # 각 메트릭 계산
    accuracy = accuracy_score(y_test, xgb_pred)
    f1 = f1_score(y_test, xgb_pred, average='weighted')
    precision, recall = sk(y_test, xgb_pred, beta=1, average='weighted')[:2]
    
    # 결과를 딕셔너리로 저장
    result_dict = {
        'Model': f"{generation}-proposed",
        'Accuracy': round(accuracy, 5),
        'F1score': round(f1, 5),
        'Precision': round(precision, 5),
        'Recall': round(recall, 5)
    }
    
    # 리스트에 딕셔너리 추가
    model_results_list.append(result_dict)

# 데이터프레임 생성
df_proposed = pd.DataFrame(model_results_list)

# 데이터프레임을 CSV 파일로 저장 (선택사항)
# df_results.to_csv('model_metrics.csv')