<a href="https://colab.research.google.com/github/tlsgptj/LG-Aimers/blob/main/LGAimers5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

하이퍼파라미터 튜닝:

GridSearchCV를 사용하여 n_estimators, max_depth, min_samples_split, min_samples_leaf, class_weight 등의 하이퍼파라미터를 최적화
데이터 불균형 처리:

SMOTE를 사용하여 Normal 클래스의 샘플을 증대시켜 데이터 불균형 문제를 완화
스케일링 적용:

StandardScaler를 사용하여 데이터의 스케일을 정규화
교차 검증 사용:

StratifiedKFold를 사용하여 더욱 견고한 성능 평가를 수행
클래스 가중치:

class_weight를 사용하여 불균형 데이터를 처리할 때 가중치를 부여

In [None]:
import os
from pprint import pprint

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE  # Imbalanced-learn 라이브러리 사용
from sklearn.model_selection import StratifiedKFold

ROOT_DIR = "data"
RANDOM_STATE = 110

# Load data
train_data = pd.read_csv(os.path.join(ROOT_DIR, "train.csv"))
normal_ratio = 1.0  # 1.0 means 1:1 ratio

df_normal = train_data[train_data["target"] == "Normal"]
df_abnormal = train_data[train_data["target"] == "AbNormal"]

num_normal = len(df_normal)
num_abnormal = len(df_abnormal)
print(f"  Total: Normal: {num_normal}, AbNormal: {num_abnormal}")

# Oversampling으로 비율 맞추기
if num_normal < num_abnormal:
    smote = SMOTE(sampling_strategy='auto', random_state=RANDOM_STATE)
    df_normal, _ = smote.fit_resample(df_normal, np.zeros(len(df_normal)))

df_normal = df_normal.sample(n=int(num_abnormal * normal_ratio), replace=True, random_state=RANDOM_STATE)
df_concat = pd.concat([df_normal, df_abnormal], axis=0).reset_index(drop=True)
df_concat.value_counts("target")
df_train, df_val = train_test_split(
    df_concat,
    test_size=0.3,
    stratify=df_concat["target"],
    random_state=RANDOM_STATE,
)


def print_stats(df: pd.DataFrame):
    num_normal = len(df[df["target"] == "Normal"])
    num_abnormal = len(df[df["target"] == "AbNormal"])

    print(f"  Total: Normal: {num_normal}, AbNormal: {num_abnormal}" + f" ratio: {num_abnormal/num_normal}")


# Print statistics
print(f"  \tAbnormal\tNormal")
print_stats(df_train)
print_stats(df_val)

# RandomForestClassifier와 파이프라인 설정
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', RandomForestClassifier(random_state=RANDOM_STATE))
])

# 하이퍼파라미터 그리드 설정
param_grid = {
    'model__n_estimators': [100, 200, 300],
    'model__max_depth': [None, 10, 20, 30],
    'model__min_samples_split': [2, 5, 10],
    'model__min_samples_leaf': [1, 2, 4],
    'model__class_weight': [None, 'balanced']  # 클래스 가중치 추가
}

# 그리드 서치 설정
grid_search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    verbose=2,
    scoring='f1_macro'
)

# 특성 선택
features = []

for col in df_train.columns:
    if col != "target":
        try:
            df_train[col] = df_train[col].astype(int)
            features.append(col)
        except:
            continue

train_x = df_train[features]
train_y = df_train["target"]

# 모델 학습
grid_search.fit(train_x, train_y)

# 최적 하이퍼파라미터 출력
print("Best hyperparameters:")
pprint(grid_search.best_params_)

# 검증 데이터 예측
val_x = df_val[features]
val_y = df_val["target"]

val_pred = grid_search.predict(val_x)

# 성능 평가
print("Validation Results:")
print(classification_report(val_y, val_pred))
print("Confusion Matrix:")
print(confusion_matrix(val_y, val_pred))

# 테스트 데이터 예측
test_data = pd.read_csv(os.path.join(ROOT_DIR, "test.csv"))
df_test_x = test_data[features]

for col in df_test_x.columns:
    try:
        df_test_x.loc[:, col] = df_test_x[col].astype(int)
    except:
        continue

test_pred = grid_search.predict(df_test_x)

# 제출 데이터 읽어오기 (df_test는 전처리된 데이터가 저장됨)
df_sub = pd.read_csv("submission.csv")
df_sub["target"] = test_pred

# 제출 파일 저장
df_sub.to_csv("submission.csv", index=False)

In [None]:
import os
from pprint import pprint

import numpy as np
import pandas as pd
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
)
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from imblearn.over_sampling import SMOTE  # Imbalanced-learn 라이브러리 사용
from sklearn.model_selection import StratifiedKFold

ROOT_DIR = "data"
RANDOM_STATE = 110

# Load data
train_data = pd.read_csv(os.path.join(ROOT_DIR, "train.csv"))
normal_ratio = 1.0  # 1.0 means 1:1 ratio

# 타겟 변수 레이블을 0과 1로 변환
train_data['target'] = train_data['target'].map({'Normal': 1, 'AbNormal': 0})

df_normal = train_data[train_data["target"] == 1]
df_abnormal = train_data[train_data["target"] == 0]

num_normal = len(df_normal)
num_abnormal = len(df_abnormal)
print(f"  Total: Normal: {num_normal}, AbNormal: {num_abnormal}")

# Oversampling으로 비율 맞추기
if num_normal < num_abnormal:
    smote = SMOTE(sampling_strategy='auto', random_state=RANDOM_STATE)
    df_normal, _ = smote.fit_resample(df_normal, np.zeros(len(df_normal)))

df_normal = df_normal.sample(n=int(num_abnormal * normal_ratio), replace=True, random_state=RANDOM_STATE)
df_concat = pd.concat([df_normal, df_abnormal], axis=0).reset_index(drop=True)
df_concat.value_counts("target")
df_train, df_val = train_test_split(
    df_concat,
    test_size=0.3,
    stratify=df_concat["target"],
    random_state=RANDOM_STATE,
)


def print_stats(df: pd.DataFrame):
    num_normal = len(df[df["target"] == 1])
    num_abnormal = len(df[df["target"] == 0])

    print(f"  Total: Normal: {num_normal}, AbNormal: {num_abnormal}" + f" ratio: {num_abnormal/num_normal}")


# Print statistics
print(f"  \tAbnormal\tNormal")
print_stats(df_train)
print_stats(df_val)

# 범주형 열과 수치형 열 구분
categorical_cols = df_train.select_dtypes(include=['object']).columns.tolist()
numerical_cols = df_train.select_dtypes(include=[np.number]).columns.tolist()

# 특성 선택 및 데이터 타입 확인
features = [col for col in df_train.columns if col != 'target']

train_x = df_train[features]
train_y = df_train["target"]

# 결측값 및 무한값 처리
train_x = train_x.replace([np.inf, -np.inf], np.nan).fillna(0)

# 전처리 파이프라인 설정
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_cols),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
    ]
)

# XGBClassifier와 파이프라인 설정
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('model', XGBClassifier(
        random_state=RANDOM_STATE,
        use_label_encoder=False,  # 경고 메시지 방지
        eval_metric='logloss'     # 평가 지표 설정
    ))
])

# 하이퍼파라미터 그리드 설정
param_grid = {
    'model__n_estimators': [100, 200, 300],
    'model__max_depth': [3, 5, 7, 10],
    'model__learning_rate': [0.01, 0.1, 0.2],
    'model__subsample': [0.8, 0.9, 1.0],
    'model__colsample_bytree': [0.8, 0.9, 1.0],
    'model__scale_pos_weight': [1, num_abnormal/num_normal]  # 클래스 가중치 추가
}

# 그리드 서치 설정
grid_search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    verbose=3,  # verbose를 높여 더 많은 정보 출력
    scoring='f1_macro',
    error_score='raise'  # 디버깅을 위해 에러 발생 시 예외를 발생
)

# 모델 학습
try:
    grid_search.fit(train_x, train_y)
    # 최적 하이퍼파라미터 출력
    print("Best hyperparameters:")
    pprint(grid_search.best_params_)
except ValueError as e:
    print(f"Error during grid search: {e}")

# 검증 데이터 예측
val_x = df_val[features]
val_y = df_val["target"]

# 결측값 및 무한값 처리
val_x = val_x.replace([np.inf, -np.inf], np.nan).fillna(0)

val_pred = grid_search.predict(val_x)

# 성능 평가
print("Validation Results:")
print(classification_report(val_y, val_pred))
print("Confusion Matrix:")
print(confusion_matrix(val_y, val_pred))

# 테스트 데이터 예측
test_data = pd.read_csv(os.path.join(ROOT_DIR, "test.csv"))
df_test_x = test_data[features]

# 범주형 열과 수치형 열 구분
df_test_x = df_test_x.replace([np.inf, -np.inf], np.nan).fillna(0)

test_pred = grid_search.predict(df_test_x)

# 제출 데이터 읽어오기 (df_test는 전처리된 데이터가 저장됨)
df_sub = pd.read_csv("submission.csv")
df_sub["target"] = test_pred

# 제출 파일 저장
df_sub.to_csv("submission.csv", index=False)


In [None]:
#LightGBM
import os
from pprint import pprint

import numpy as np
import pandas as pd
from lightgbm import LGBMClassifier
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
)
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE  # Imbalanced-learn 라이브러리 사용

ROOT_DIR = "data"
RANDOM_STATE = 110

# Load data
train_data = pd.read_csv(os.path.join(ROOT_DIR, "train.csv"))
normal_ratio = 1.0  # 1.0 means 1:1 ratio

df_normal = train_data[train_data["target"] == "Normal"]
df_abnormal = train_data[train_data["target"] == "AbNormal"]

num_normal = len(df_normal)
num_abnormal = len(df_abnormal)
print(f"  Total: Normal: {num_normal}, AbNormal: {num_abnormal}")

# Oversampling으로 비율 맞추기
if num_normal < num_abnormal:
    smote = SMOTE(sampling_strategy='auto', random_state=RANDOM_STATE)
    df_normal, _ = smote.fit_resample(df_normal, np.zeros(len(df_normal)))

df_normal = df_normal.sample(n=int(num_abnormal * normal_ratio), replace=True, random_state=RANDOM_STATE)
df_concat = pd.concat([df_normal, df_abnormal], axis=0).reset_index(drop=True)
df_concat.value_counts("target")
df_train, df_val = train_test_split(
    df_concat,
    test_size=0.3,
    stratify=df_concat["target"],
    random_state=RANDOM_STATE,
)


def print_stats(df: pd.DataFrame):
    num_normal = len(df[df["target"] == "Normal"])
    num_abnormal = len(df[df["target"] == "AbNormal"])

    print(f"  Total: Normal: {num_normal}, AbNormal: {num_abnormal}" + f" ratio: {num_abnormal/num_normal}")


# Print statistics
print(f"  \tAbnormal\tNormal")
print_stats(df_train)
print_stats(df_val)

# LGBMClassifier와 파이프라인 설정
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', LGBMClassifier(random_state=RANDOM_STATE))
])

# 하이퍼파라미터 그리드 설정
param_grid = {
    'model__n_estimators': [100, 200, 300],
    'model__max_depth': [3, 5, 7, -1],  # -1 for no limit
    'model__learning_rate': [0.01, 0.1, 0.2],
    'model__num_leaves': [31, 63, 127],  # Larger num_leaves -> more complex model
    'model__boosting_type': ['gbdt', 'dart'],  # Different boosting methods
    'model__class_weight': [None, 'balanced']  # 클래스 가중치 추가
}

# 그리드 서치 설정
grid_search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    verbose=2,
    scoring='f1_macro'
)

# 특성 선택
features = []

for col in df_train.columns:
    if col != "target":
        try:
            df_train[col] = df_train[col].astype(float)  # LightGBM prefers float
            features.append(col)
        except:
            continue

train_x = df_train[features]
train_y = df_train["target"]

# 모델 학습
grid_search.fit(train_x, train_y)

# 최적 하이퍼파라미터 출력
print("Best hyperparameters:")
pprint(grid_search.best_params_)

# 검증 데이터 예측
val_x = df_val[features]
val_y = df_val["target"]

val_pred = grid_search.predict(val_x)

# 성능 평가
print("Validation Results:")
print(classification_report(val_y, val_pred))
print("Confusion Matrix:")
print(confusion_matrix(val_y, val_pred))

# 테스트 데이터 예측
test_data = pd.read_csv(os.path.join(ROOT_DIR, "test.csv"))
df_test_x = test_data[features]

for col in df_test_x.columns:
    try:
        df_test_x.loc[:, col] = df_test_x[col].astype(float)  # LightGBM prefers float
    except:
        continue

test_pred = grid_search.predict(df_test_x)

# 제출 데이터 읽어오기 (df_test는 전처리된 데이터가 저장됨)
df_sub = pd.read_csv("submission.csv")
df_sub["target"] = test_pred

# 제출 파일 저장
df_sub.to_csv("submission.csv", index=False)
