# OSS 언더샘플링 1:3

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns
from datetime import datetime
import time
import warnings
warnings.filterwarnings("ignore")

from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler
from sklearn.metrics import accuracy_score,f1_score,recall_score,precision_score,confusion_matrix,ConfusionMatrixDisplay,roc_curve,roc_auc_score,precision_recall_curve
from sklearn.ensemble import RandomForestClassifier , StackingClassifier
from sklearn.svm import SVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression,Lasso
from sklearn.preprocessing import Binarizer
from sklearn.model_selection import cross_val_score,GridSearchCV

# 한글 깨짐 방지
import matplotlib.pyplot as plt
plt.rcParams['font.family'] = 'malgun Gothic'

In [2]:
train = pd.read_csv( './Dataset/Undersampling/OSS_0.33_train.csv',encoding='euc-kr')
test = pd.read_csv('./Dataset/Undersampling/OSS_0.33_test.csv', encoding='euc-kr')

In [3]:
X_train_sum=train[['유동자산회전률', '총자산대비잉여현금흐름','자기자본구성비율', 'log자산총계','자기자본회전률', '순운전자본회전률', '자기자본증가율', '총자본증가율', '총자산대비현금흐름', '총자본투자효율']]
y_train = train[['t-1감사의견코드']]
X_test_sum=test[['유동자산회전률', '총자산대비잉여현금흐름','자기자본구성비율', 'log자산총계','자기자본회전률', '순운전자본회전률', '자기자본증가율', '총자본증가율', '총자산대비현금흐름', '총자본투자효율']]
y_test = test[['t-1감사의견코드']]

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# DNN 모델 정의
class DNN(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(DNN, self).__init__()
        self.input_size = input_size
        self.hidden_sizes = hidden_sizes
        self.output_size = output_size

        layers = [nn.Linear(input_size, hidden_sizes[0])]
        for i in range(1, len(hidden_sizes)):
            layers.append(nn.Linear(hidden_sizes[i-1], hidden_sizes[i]))
        self.hidden_layers = nn.ModuleList(layers)

        self.output_layer = nn.Linear(hidden_sizes[-1], output_size)

    def forward(self, x):
        for layer in self.hidden_layers:
            x = nn.ReLU()(layer(x))
        x = self.output_layer(x)
        return x

def train_dnn_with_kfold(X_train, y_train, X_test, y_test, k):
    # Stratified k-fold 교차검증 설정
    cv = StratifiedKFold(n_splits=k, shuffle=True, random_state=0)

    # 각 fold 별 평가 지표를 저장할 리스트 초기화
    accuracy_list = []
    precision_list = []
    recall_list = []
    f1_score_list = []
    confusion_matrix_list = []

    best_f1_score = 0
    best_model = None

    for fold_idx, (train_idx, val_idx) in enumerate(cv.split(X_train, y_train), 1):
        X_train_fold, X_val_fold = X_train.iloc[train_idx], X_train.iloc[val_idx]
        y_train_fold, y_val_fold = y_train.iloc[train_idx], y_train.iloc[val_idx]

        # DNN 모델 초기화 및 학습
        input_size = X_train_fold.shape[1]
        hidden_sizes = [64,32]
        output_size = 1
        model = DNN(input_size, hidden_sizes, output_size)

        criterion = nn.BCEWithLogitsLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.008)

        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model.to(device)

        X_train_tensor = torch.tensor(X_train_fold.values, dtype=torch.float32).to(device)
        y_train_tensor = torch.tensor(y_train_fold.values, dtype=torch.float32).view(-1, 1).to(device)

        num_epochs = 1000
        for epoch in range(num_epochs):
            outputs = model(X_train_tensor)
            loss = criterion(outputs, y_train_tensor)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # 검증 데이터에 대한 예측 및 평가
        X_val_tensor = torch.tensor(X_val_fold.values, dtype=torch.float32).to(device)
        with torch.no_grad():
            model.eval()
            y_val_pred_tensor = model(X_val_tensor)
            y_val_pred = (y_val_pred_tensor >= 0.5).view(-1).cpu().numpy()

        # 평가 지표 계산
        accuracy = accuracy_score(y_val_fold, y_val_pred)
        precision = precision_score(y_val_fold, y_val_pred)
        recall = recall_score(y_val_fold, y_val_pred)
        f1 = f1_score(y_val_fold, y_val_pred)
        conf_matrix = confusion_matrix(y_val_fold, y_val_pred)

        # 각 fold 별 평가 지표를 리스트에 추가
        accuracy_list.append(accuracy)
        precision_list.append(precision)
        recall_list.append(recall)
        f1_score_list.append(f1)
        confusion_matrix_list.append(conf_matrix)

        print(f"{fold_idx}번째 Fold")
        print("평가 지표")
        print("Accuracy:", accuracy)
        print("Precision:", precision)
        print("Recall:", recall)
        print("F1 Score:", f1)
        print("Confusion Matrix:")
        print(conf_matrix)
        print("------------------------------")

        # 가장 좋은 F1 스코어를 가진 모델을 저장
        if f1 > best_f1_score:
            best_f1_score = f1
            best_model = model

    # 전체 교차 검증 결과 출력
    print("전체 교차 검증 결과")
    print("평균 Accuracy:", sum(accuracy_list) / len(accuracy_list))
    print("평균 Precision:", sum(precision_list) / len(precision_list))
    print("평균 Recall:", sum(recall_list) / len(recall_list))
    print("평균 F1 Score:", sum(f1_score_list) / len(f1_score_list))

    # 가장 좋은 F1 스코어를 가진 모델로 최종 예측 수행
    X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
    with torch.no_grad():
        best_model.eval()
        y_test_pred_tensor = best_model(X_test_tensor)
        y_test_pred = (y_test_pred_tensor >= 0.5).view(-1).cpu().numpy()

    # 테스트 데이터에 대한 평가 지표 계산
    accuracy_final = accuracy_score(y_test, y_test_pred)
    precision_final = precision_score(y_test, y_test_pred)
    recall_final = recall_score(y_test, y_test_pred)
    f1_final = f1_score(y_test, y_test_pred)
    conf_matrix_final = confusion_matrix(y_test, y_test_pred)

    print("테스트 데이터 평가 결과")
    print("Accuracy:", accuracy_final)
    print("Precision:", precision_final)
    print("Recall:", recall_final)
    print("F1 Score:", f1_final)
    print("Confusion Matrix:")
    print(conf_matrix_final)

    return best_model


In [7]:
train_dnn_with_kfold(X_train_sum, y_train, X_test_sum, y_test, k=5)