In [1]:
# ===== Cell 1: Imports =====
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from imblearn.over_sampling import SMOTE

In [2]:
# ===== Cell 2: 컬럼명 및 문자열 정규화 함수 =====

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """DataFrame의 컬럼명과 문자열 값을 소문자+언더스코어 형식으로 정규화하는 함수"""

    # 원본 DataFrame을 변경하지 않기 위해 복사본 생성
    df = df.copy()

    # 컬럼명 정규화
    df.columns = (
        df.columns
          .str.strip()  # 컬럼명 양쪽 공백 제거
          .str.lower()  # 소문자 변환
          .str.replace(' ', '_', regex=False)  # 공백을 언더스코어로 변경
          .str.replace(r'[^0-9a-zA-Z_]', '', regex=True)  # 알파벳, 숫자, 언더스코어 제외한 특수문자 제거
    )

    # 문자열 또는 카테고리 타입 컬럼 값 정규화
    for col in df.select_dtypes(include=['object', 'category']):
        df[col] = (
            df[col].astype(str)  # 값을 문자열로 변환
                   .str.strip()  # 값 양쪽 공백 제거
                   .str.replace(' ', '_', regex=False)  # 공백을 언더스코어로 변경
        )

    # 정규화된 DataFrame 반환
    return df


In [3]:
# ===== Cell 3: ID 및 결측치 비율 높은 컬럼 제거 함수 =====

def drop_id_unnamed_and_missing(df: pd.DataFrame, missing_thresh: float = 0.5) -> pd.DataFrame:
    """ID/Unnamed 컬럼 및 결측치 비율이 지정 기준 이상인 컬럼을 제거하는 함수"""

    # 원본 DataFrame을 변경하지 않기 위해 복사본 생성
    df = df.copy()

    # 'Unnamed'로 시작하거나 'id'로 끝나는 컬럼명을 찾기
    to_drop = df.columns[df.columns.str.contains(r'^(?:unnamed)|id$', case=False, regex=True)]

    # 해당 컬럼들 제거
    df = df.drop(columns=to_drop)

    # 컬럼별 결측치 비율 계산
    missing = df.isnull().mean()

    # 결측치 비율이 missing_thresh(기본값 0.5) 초과인 컬럼 제거
    df = df.drop(columns=missing[missing > missing_thresh].index)

    # 처리된 DataFrame 반환
    return df


In [4]:
# ===== Cell 4: 결측치 대치(Imputation) 함수 =====

def impute_missing(df: pd.DataFrame) -> pd.DataFrame:
    """수치형 컬럼은 중간값(median), 범주형 컬럼은 최빈값(most frequent)으로 결측치를 대치하는 함수"""

    # 원본 DataFrame을 변경하지 않기 위해 복사본 생성
    df = df.copy()

    # 수치형(numeric) 컬럼 목록 추출
    num_cols = df.select_dtypes(include=np.number).columns.tolist()

    # 범주형(object, category, bool) 컬럼 목록 추출
    cat_cols = df.select_dtypes(include=['object', 'category', 'bool']).columns.tolist()

    # 수치형 컬럼의 결측치: 중간값(median)으로 대치
    if num_cols:
        df[num_cols] = pd.DataFrame(
            SimpleImputer(strategy='median').fit_transform(df[num_cols]),
            columns=num_cols
        )

    # 범주형 컬럼의 결측치: 최빈값(most frequent)으로 대치
    if cat_cols:
        df[cat_cols] = pd.DataFrame(
            SimpleImputer(strategy='most_frequent').fit_transform(df[cat_cols]),
            columns=cat_cols
        )

    # 결측치가 대치된 DataFrame 반환
    return df


In [5]:
# ===== Cell 5: 이상치 제거 함수 (IQR 기준) =====

def remove_outliers(df: pd.DataFrame, max_removal: float = 0.2) -> pd.DataFrame:
    """IQR(Interquartile Range) 기준으로 이상치를 제거하되, 전체 데이터의 max_removal 비율 이하만 제거하는 함수"""

    # 원본 DataFrame을 변경하지 않기 위해 복사본 생성
    df = df.copy()

    # 전체 행(row) 수 저장
    total = len(df)

    # 수치형 컬럼들에 대해 이상치 제거 진행
    for col in df.select_dtypes(include=np.number):
        # 1사분위수(Q1)와 3사분위수(Q3) 계산
        Q1, Q3 = df[col].quantile([0.25, 0.75])

        # IQR(Interquartile Range) 계산
        IQR = Q3 - Q1

        # IQR 범위 내에 있는 데이터만 True인 mask 생성
        mask = df[col].between(Q1 - 1.5 * IQR, Q3 + 1.5 * IQR)

        # 남게 될 데이터가 전체의 (1 - max_removal) 이상인 경우에만 이상치 제거 수행
        if mask.sum() > (1 - max_removal) * total:
            df = df[mask]

    # 이상치가 제거된 DataFrame 반환
    return df


In [6]:
# ===== Cell 6: 높은 상관관계 컬럼 제거 함수 =====

def drop_highly_correlated(df: pd.DataFrame, threshold: float = 0.95) -> pd.DataFrame:
    """수치형 컬럼 중 상관계수(절댓값)가 threshold 이상인 컬럼을 제거하는 함수"""

    # 원본 DataFrame을 변경하지 않기 위해 복사본 생성
    df = df.copy()

    # 수치형 컬럼끼리의 상관계수 행렬 계산 (절댓값 기준)
    corr = df.select_dtypes(include=np.number).corr().abs()

    # 상삼각행렬(Upper Triangle)만 선택 (중복 비교 방지)
    upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool))

    # threshold를 초과하는 상관관계가 있는 컬럼 리스트 추출
    to_drop = [c for c in upper.columns if any(upper[c] > threshold)]

    # 해당 컬럼들 제거 후 반환
    return df.drop(columns=to_drop)


In [7]:
# ===== Cell 7: 범주형 인코딩 및 수치형 스케일링 함수 =====

def encode_and_normalize(X: pd.DataFrame, max_onehot: int = 10) -> pd.DataFrame:
    """범주형 변수는 Label/OneHot 인코딩, 수치형 변수는 StandardScaler로 정규화하는 함수"""

    # 원본 DataFrame을 변경하지 않기 위해 복사본 생성
    X = X.copy()

    # 범주형(object, category, bool) 컬럼 인코딩
    for col in X.select_dtypes(include=['object', 'category', 'bool']):
        nun = X[col].nunique()  # 고유값 개수

        if nun <= 2:
            # 고유값이 2개 이하 → Label Encoding
            X[col] = LabelEncoder().fit_transform(X[col])

        elif nun <= max_onehot:
            # 고유값이 max_onehot 이하 → One-Hot Encoding
            ohe = OneHotEncoder(sparse=False, handle_unknown='ignore')
            arr = ohe.fit_transform(X[[col]])
            cols = [f"{col}_{cat}" for cat in ohe.categories_[0]]  # 새 컬럼명 생성
            X[cols] = arr
            X = X.drop(columns=[col])

        else:
            # 고유값이 많으면 해당 컬럼 삭제
            X = X.drop(columns=[col])

    # 수치형 컬럼 정규화 (StandardScaler 적용)
    num_cols = X.select_dtypes(include=np.number).columns
    X[num_cols] = StandardScaler().fit_transform(X[num_cols])

    # 변환된 DataFrame 반환
    return X


In [8]:
# ===== Cell 8: 데이터 전처리 전체 프로세스 함수 =====

def preprocess(input_file: str, target_col: str,
               missing_thresh: float = 0.5,
               max_removal: float = 0.2,
               corr_thresh: float = 0.95,
               max_onehot: int = 10) -> str:
    """CSV 파일을 로드하고, 전처리 과정을 거쳐, 전처리된 CSV 파일로 저장하는 함수"""

    # CSV 파일 읽기
    df = pd.read_csv(input_file)

    # 타겟 컬럼(target_col)이 결측인 행 제거 후 인덱스 리셋
    df = df.dropna(subset=[target_col]).reset_index(drop=True)

    # 타겟 y 분리
    y = df[target_col]

    # 입력 피처 X 준비 (타겟 컬럼 제외)
    X = df.drop(columns=[target_col])

    # 1. 컬럼명 및 문자열 정규화
    X = standardize_columns(X)

    # 2. ID/Unnamed 컬럼 및 결측치 비율 높은 컬럼 제거
    X = drop_id_unnamed_and_missing(X, missing_thresh)

    # 3. 결측치 대치 (수치형: 중간값, 범주형: 최빈값)
    X = impute_missing(X)

    # 4. 이상치 제거 (IQR 기준)
    X = remove_outliers(X, max_removal)

    # 5. 높은 상관관계 컬럼 제거
    X = drop_highly_correlated(X, corr_thresh)

    # 6. 인코딩 및 스케일링
    X = encode_and_normalize(X, max_onehot)

    # y도 X의 인덱스에 맞춰 리셋
    y = y.loc[X.index].reset_index(drop=True)

    # 파일명 구성 (입력 파일명 기반)
    base = os.path.splitext(os.path.basename(input_file))[0]
    out_file = f"processed_{base}.csv"

    # X와 y를 합쳐서 CSV로 저장
    pd.concat([X.reset_index(drop=True), y], axis=1).to_csv(out_file, index=False)

    # 저장 경로 출력
    print(f"[INFO] Processed data saved to: {out_file}")

    return out_file


In [10]:
# ===== Cell 9: 메인 실행 코드 =====
input_file = '2_Card.csv'

processed_path = preprocess(input_file, 'default.payment.next.month')

[INFO] Processed data saved to: processed_2_Card.csv
