In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 결측치 처리 버전

| 버전 | 결측치 처리 방식                       | 함수 이름                  |
|------|----------------------------------------|----------------------------|
| A    | 모든 결측치를 **0**으로 대체            | `filling_zero`             |
| B    | 모든 결측치를 **평균값**으로 대체        | `filling_mean`             |
| C    | 모든 결측치를 **중앙값**으로 대체        | `filling_median`           |
| D    | 모든 결측치를 **-1**로 대체             | `filling_minus1`           |
| E    | 모든 결측치를 **클러스터별 평균**으로 대체 | `filling_cluster_mean`     |
| F    | 모든 결측치를 **KNN 기반 보간**으로 대체  | `filling_knn`              |
| G    | 모든 결측치를 **최빈값**으로 대체        | `filling_mode`             |
| H    | 모든 결측치를 **앞의 값으로 채움**        | `filling_forward`          |
| I    | 모든 결측치를 **뒤의 값으로 채움**        | `filling_backward`         |
| J    | 모든 결측치를 **선형 보간법**으로 대체     | `filling_interpolate`      |
| K    | 모든 결측치를 **무작위 샘플로 대체**       | `filling_random_sample`    |
| L    | 모든 결측치를 **고정값(-999)**으로 대체    | `filling_constant`         |

### 결측치 설명(사전 작업 필요 결측치)

**E: 클러스터별 평균**

같은 클러스터(비슷한 기업 그룹) 안에서 평균을 계산해서 그 값으로 결측치를 채움

KMeans 알고리즘으로 모든 기업을 5개 클러스터로 나눔

컬럼에 결측치가 있으면 같은 클러스터에 속한 컬럼 평균을 구한 후 해당 결측치 채움

  클러스터별로 나누려면 cluster 열이 먼저 있어야 함

In [None]:
'''
from sklearn.cluster import KMeans

def assign_clusters(df, n_clusters=5):
    df = df.copy()
    cols = ['직원 수', '고객수(백만명)', '기업가치(백억원)']  # 수치형 중심
    for col in cols:
        df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
    df_cluster_base = df[cols].fillna(df[cols].mean())
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    df['cluster'] = kmeans.fit_predict(df_cluster_base)
    return df
'''

준비 코드

df = assign_clusters(df)

적용

from missingvalue import filling_cluster_mean
df = filling_cluster_mean(df)

**F: KNN 기반 보간**

수치형 변수들 간의 유사도를 이용해서 결측치를 채우는 방식

결측치가 있는 행(기업)을 주변 데이터들과 비교함

가장 유사한 기업 5개(n_neighbors=5)를 찾음

그 기업들의 해당 컬럼 값을 평균 내서 결측치를 채움

  KNNImputer는 데이터에 NaN이 너무 많거나 비어 있으면 작동안 함

In [None]:
'''
def fill_for_knn(df):
    df = df.copy()
    for col in df.select_dtypes(include='number'):
        if df[col].isnull().sum() == len(df):  # 전부 NaN이면
            df[col] = 0  # 또는 다른 안전한 값으로 채움
        elif df[col].isnull().sum() > 0:
            df[col] = df[col].fillna(df[col].mean())  # 임시 평균 대체
    return df
'''

준비 코드

df = fill_for_knn(df)

적용

from missingvalue import filling_knn

df = filling_knn(df)

**J: 선형 보간법**

값이 비어 있는 구간을 앞뒤 값의 선을 그어 예측해서 채움

숫자 값들이 시간이나 순서처럼 연속적이라고 가정

결측치 앞뒤에 있는 숫자 2개를 연결하는 직선을 그려 그 직선 위의 값을 계산해서 결측치에 넣음

ex) [10, NaN, NaN, 40] → [10, 20, 30, 40]

선형 보간은 수치형 + 정렬된 순서에서만 잘 작동

In [None]:
'''
def prepare_for_interpolation(df):
    df = df.copy()
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
    df = df.sort_index()  # 순서 기반이면 정렬 필요
    return df
'''

준비 코드

df = prepare_for_interpolation(df)

적용

from missingvalue import filling_interpolate

df = filling_interpolate(df)

## 결측치 처리

In [1]:
"""
각 함수는 입력된 DataFrame을 복사하여, 수치형 또는 범주형 컬럼의 결측치를 처리해 반환
- apply_strategies_by_column(df, strategy_map) 함수를 통해 컬럼별 전략 매핑 지원

- apply_strategies_by_column(df, column_strategy_map)
  → 컬럼별로 다른 전략을 매핑하여 처리할 수 있는 메인 함수

"""

import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer

# === 개별 처리 전략 ===

def filling_zero(df):
    """모든 결측치를 0으로 대체"""
    df = df.copy()
    return df.fillna(0)

def filling_minus1(df):
    """모든 결측치를 -1로 대체"""
    df = df.copy()
    return df.fillna(-1)

def filling_mean(df):
    """수치형 컬럼별 평균값으로 대체"""
    df = df.copy()
    for col in df.select_dtypes(include='number'):
        df[col] = df[col].fillna(df[col].mean())
    return df

def filling_median(df):
    """수치형 컬럼별 중앙값으로 대체"""
    df = df.copy()
    for col in df.select_dtypes(include='number'):
        df[col] = df[col].fillna(df[col].median())
    return df

def filling_cluster_mean(df):
    """
    클러스터 그룹별 평균값으로 수치형 결측치를 대체
     'cluster' 컬럼이 반드시 존재해야 하며,
    각 클러스터 내에서 평균을 구해 해당 그룹 내 결측치를 채
    """
    df = df.copy()
    if 'cluster' not in df.columns:
        raise ValueError("'cluster' column is required for filling_cluster_mean")
    for col in df.select_dtypes(include='number'):
        if col == 'cluster':
            continue
        for cl in df['cluster'].unique():
            mean_val = df[df['cluster'] == cl][col].mean()
            df.loc[(df['cluster'] == cl) & (df[col].isnull()), col] = mean_val
    return df

def filling_knn(df, n_neighbors=5):
    """
    수치형 컬럼에 대해 KNN 보간법으로 결측치를 대체
    유사한 데이터를 기준으로 가장 가까운 n개의 데이터를 기반으로 예측
    """
    df = df.copy()
    numeric_cols = df.select_dtypes(include='number').columns
    imputer = KNNImputer(n_neighbors=n_neighbors)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    return df

def filling_mode(df):
    """각 컬럼의 최빈값으로 결측치 대체"""
    df = df.copy()
    for col in df.columns:
        if df[col].isnull().sum() > 0:
            df[col] = df[col].fillna(df[col].mode()[0])
    return df

def filling_forward(df):
    """앞쪽 값으로 채움 (forward fill)"""
    return df.ffill()

def filling_backward(df):
    """뒤쪽 값으로 채움 (backward fill)"""
    return df.bfill()

def filling_interpolate(df):
    """
    선형 보간 (interpolation)
    앞뒤 숫자 값을 직선으로 연결해 그 사이의 결측치를 예측
    """
    df = df.copy()
    numeric_cols = df.select_dtypes(include='number').columns
    df[numeric_cols] = df[numeric_cols].interpolate(method='linear', limit_direction='both')
    return df

def filling_random_sample(df):
    """각 컬럼 내 기존 값 중 무작위 샘플로 대체"""
    df = df.copy()
    for col in df.columns:
        if df[col].isnull().sum() > 0:
            non_null = df[col].dropna().values
            sampled = np.random.choice(non_null, size=df[col].isnull().sum(), replace=True)
            df.loc[df[col].isnull(), col] = sampled
    return df

def filling_constant(df, constant=-999):
    """모든 결측치를 고정값으로 대체 (기본값: -999)"""
    df = df.copy()
    return df.fillna(constant)

# === 전략 코드 → 함수 매핑 딕셔너리 ===

strategy_dict = {
    'A': filling_zero,
    'B': filling_mean,
    'C': filling_median,
    'D': filling_minus1,
    'E': filling_cluster_mean,
    'F': filling_knn,
    'G': filling_mode,
    'H': filling_forward,
    'I': filling_backward,
    'J': filling_interpolate,
    'K': filling_random_sample,
    'L': filling_constant,
}

def apply_strategies_by_column(df, column_strategy_map):
    """
    컬럼별 전략 맵에 따라 결측치를 처리한 DataFrame을 반환
    예시:
        column_strategy_map = {
            '고객수(백만명)': 'J',
            '기업가치(백억원)': 'I',
            '분야': 'G',
            '직원 수': 'J'
        }
    """
    df_filled = df.copy()
    for col, strat_code in column_strategy_map.items():
        if col not in df_filled.columns:
            print(f"[경고] 컬럼 '{col}'은(는) 데이터프레임에 존재하지 않습니다. 건너뜁니다.")
            continue
        strategy_fn = strategy_dict.get(strat_code)
        if not strategy_fn:
            print(f"[경고] 전략 코드 '{strat_code}'는 존재하지 않습니다. 유효한 A~L 전략을 확인하세요.")
            continue
        # cluster_mean은 cluster 컬럼 필요
        if strat_code == 'E' and 'cluster' not in df_filled.columns:
            print("[경고] cluster_mean 전략을 사용하려면 'cluster' 컬럼이 먼저 생성되어야 합니다. 해당 컬럼 없음 → 건너뜀.")
            continue
        temp = strategy_fn(df_filled[[col, 'cluster']] if strat_code == 'E' else df_filled[[col]])
        df_filled[col] = temp[col]
    return df_filled