# 위기 신호 지표 산출 노트북

EDA 결과를 기반으로 정의한 위기 신호 지표를 계산하고, 가맹점-월 단위의 위험 스코어를 생성합니다.



## 1. 환경 설정 및 데이터 로드
- CSV 데이터 경로와 컬럼 매핑을 정의합니다.
- 분석에 필요한 컬럼 이름을 한글로 정리하고, `기준년월`을 날짜 형식으로 변환합니다.



In [None]:

import pandas as pd
import numpy as np
from pathlib import Path

pd.set_option('display.max_columns', 80)
pd.set_option('display.width', 180)

DATA_PATH = Path('..') / 'data' / 'df_merged.csv'

rename_map = {
    'Unnamed: 0': '레코드ID',
    'ENCODED_MCT': '가맹점ID',
    'TA_YM': '기준년월',
    'MCT_OPE_MS_CN': '영업개월수_구간',
    'RC_M1_SAA': '최근1개월_이용금액_구간',
    'RC_M1_TO_UE_CT': '최근1개월_이용건수_구간',
    'RC_M1_UE_CUS_CN': '최근1개월_이용고객수_구간',
    'RC_M1_AV_NP_AT': '최근1개월_평균결제금액_구간',
    'APV_CE_RAT': '승인취소비율_구간',
    'DLV_SAA_RAT': '배달이용금액_비율',
    'M1_SME_RY_SAA_RAT': '1개월_동종업종_매출비율',
    'M1_SME_RY_CNT_RAT': '1개월_동종업종_건수비율',
    'M12_SME_RY_SAA_PCE_RT': '12개월_동종업종_매출순위',
    'M12_SME_BZN_SAA_PCE_RT': '12개월_동일상권_매출순위',
    'M12_SME_RY_ME_MCT_RAT': '12개월_동종업종_경쟁강도',
    'M12_SME_BZN_ME_MCT_RAT': '12개월_동일상권_경쟁강도',
    'MCT_UE_CLN_REU_RAT': '재방문고객비율',
    'MCT_UE_CLN_NEW_RAT': '신규고객비율',
    'RC_M1_SHC_RSD_UE_CLN_RAT': 'RC_거주지기준고객비율',
    'RC_M1_SHC_WP_UE_CLN_RAT': 'RC_직장인고객비율',
    'RC_M1_SHC_FLP_UE_CLN_RAT': 'RC_유동고객비율'
}

bucket_cols = [
    '영업개월수_구간',
    '최근1개월_이용금액_구간',
    '최근1개월_이용건수_구간',
    '최근1개월_이용고객수_구간',
    '최근1개월_평균결제금액_구간',
    '승인취소비율_구간'
]

raw_df = pd.read_csv(DATA_PATH, encoding='utf-8-sig')
df = raw_df.rename(columns=rename_map)
if '레코드ID' in df.columns:
    df = df.drop(columns=['레코드ID'])

df['기준년월'] = pd.to_datetime(df['기준년월'], errors='coerce')
df = df[df['기준년월'].notna()].copy()

numeric_cols = df.select_dtypes(include=['number']).columns
placeholder = -999999.9
placeholder_count = (df[numeric_cols] == placeholder).sum().sum()
df[numeric_cols] = df[numeric_cols].replace(placeholder, np.nan)

for col in bucket_cols:
    if col in df.columns:
        parts = df[col].astype(str).str.split('_', n=1, expand=True)
        df[f'{col}_순위'] = pd.to_numeric(parts[0], errors='coerce')
        if parts.shape[1] > 1:
            df[f'{col}_범주'] = parts[1].replace({'nan': np.nan})

print(f'data shape after cleaning: {df.shape}')
print(f'placeholder replacements: {placeholder_count:,}')
df.head()



## 2. 기본 정렬 및 공통 파생
- 가맹점과 기준년월 기준으로 정렬합니다.
- 분석에 자주 사용하는 순위 및 비율 컬럼을 준비합니다.



In [None]:

df_sorted = df.sort_values(['가맹점ID', '기준년월']).reset_index(drop=True)
rank_col = '12개월_동일상권_매출순위'

df_sorted['분기'] = df_sorted['기준년월'].dt.to_period('Q')

key_cols = ['가맹점ID', '기준년월', rank_col, '승인취소비율_구간_순위', '재방문고객비율', '신규고객비율', '배달이용금액_비율', '영업개월수_구간_순위']
df_sorted[key_cols].head()



## 3. 승인 취소 위험 지표
- 최근 1개월과 직전 3개월 평균 순위를 비교해 `승인취소비율_급등지수`를 계산합니다.
- 순위가 5 이상인 구간에서 정상화까지의 기간(`승인취소비율_정상화월수`)을 산출합니다.



In [None]:

rolling_mean = (
    df_sorted.groupby('가맹점ID')['승인취소비율_구간_순위']
    .transform(lambda s: s.shift(1).rolling(3, min_periods=2).mean())
)

df_sorted['승인취소비율_순위_roll3'] = rolling_mean
df_sorted['승인취소비율_급등지수'] = df_sorted['승인취소비율_구간_순위'] - rolling_mean

def compute_recovery(group: pd.DataFrame) -> pd.Series:
    values = group['승인취소비율_구간_순위'].to_numpy()
    result = np.full(values.shape, np.nan, dtype=float)
    for idx, val in enumerate(values):
        if np.isnan(val) or val < 5:
            continue
        later = np.where(~np.isnan(values[idx + 1:]) & (values[idx + 1:] <= 3))[0]
        if later.size > 0:
            result[idx] = float(later[0] + 1)
    return pd.Series(result, index=group.index)

recovery = df_sorted.groupby('가맹점ID', group_keys=False).apply(compute_recovery)
df_sorted['승인취소비율_정상화월수'] = recovery

df_sorted[['가맹점ID', '기준년월', '승인취소비율_구간_순위', '승인취소비율_급등지수', '승인취소비율_정상화월수']].head(10)



## 4. 고객 충성도/이탈 지표
- 재방문 및 신규 고객 비율에 대해 월별 분위 순위를 계산하고 음수 점수로 변환합니다.
- 두 점수를 합산한 `충성도_복합점수`와 3개월 대비 변동률 기반 `고객구조변화지수`를 산출합니다.



In [None]:

rebuy_score = df_sorted.groupby('기준년월')['재방문고객비율'].transform(
    lambda s: (s.rank(pct=True, method='min') - 1) * 100
)
new_score = df_sorted.groupby('기준년월')['신규고객비율'].transform(
    lambda s: (s.rank(pct=True, method='min') - 1) * 100
)

df_sorted['재방문_분위점수'] = rebuy_score
df_sorted['신규_분위점수'] = new_score

df_sorted['충성도_복합점수'] = df_sorted['재방문_분위점수'].fillna(0) + df_sorted['신규_분위점수'].fillna(0)
df_sorted['충성도위험_플래그'] = df_sorted['충성도_복합점수'] <= -30

rolling_new = df_sorted.groupby('가맹점ID')['신규고객비율'].transform(lambda s: s.rolling(3, min_periods=2).mean())
rolling_new_prev = df_sorted.groupby('가맹점ID')['신규고객비율'].transform(lambda s: s.shift(3).rolling(3, min_periods=2).mean())

change_ratio = (rolling_new - rolling_new_prev) / rolling_new_prev
change_ratio = change_ratio.replace([np.inf, -np.inf], np.nan)

df_sorted['신규고객비율_3M평균'] = rolling_new
df_sorted['신규고객비율_이전3M평균'] = rolling_new_prev
df_sorted['고객구조변화지수'] = change_ratio

df_sorted['고객구조변화_경보'] = df_sorted['고객구조변화지수'] <= -0.2

df_sorted[['가맹점ID', '기준년월', '충성도_복합점수', '충성도위험_플래그', '고객구조변화지수', '고객구조변화_경보']].head(10)



## 5. 매출 순위 및 경쟁 환경 지표
- 3개월 전 대비 동일상권 매출순위 변화를 통해 `순위급락강도점수`를 계산합니다.
- 분기별로 동일상권 경쟁강도와 매출순위의 상관계수를 `경쟁강도_영향도`로 정리합니다.



In [None]:

rank_diff_3m = df_sorted.groupby('가맹점ID')[rank_col].diff(periods=3)

df_sorted['순위급락강도점수'] = np.select(
    [rank_diff_3m >= 40, rank_diff_3m >= 20],
    [2, 1],
    default=0
)

df_sorted['동일상권_순위변화_3M'] = rank_diff_3m

competition_influence = (
    df_sorted[['분기', '12개월_동일상권_경쟁강도', rank_col]]
    .dropna()
    .groupby('분기')
    .apply(lambda g: g['12개월_동일상권_경쟁강도'].corr(g[rank_col]))
    .dropna()
    .rename('경쟁강도_상관')
    .reset_index()
)

competition_influence.tail()



## 6. 채널 믹스 지표
- 배달 비율의 3개월 변화량과 영업개월수 순위를 활용해 `배달의존도급변점수`를 계산합니다.
- 6개월 이동 표준편차 대비 평균으로 `배달의존도_안정성점수`를 구합니다.



In [None]:

delivery_change = df_sorted.groupby('가맹점ID')['배달이용금액_비율'].diff(periods=3)

rolling_std = df_sorted.groupby('가맹점ID')['배달이용금액_비율'].transform(lambda s: s.rolling(6, min_periods=3).std())
rolling_mean = df_sorted.groupby('가맹점ID')['배달이용금액_비율'].transform(lambda s: s.rolling(6, min_periods=3).mean())

stability_score = np.where(rolling_mean > 0, rolling_std / rolling_mean, np.nan)

df_sorted['배달비율변화_3M'] = delivery_change

df_sorted['배달의존도급변점수'] = np.select(
    [
        (df_sorted['영업개월수_구간_순위'] <= 2) & (delivery_change >= 30),
        (df_sorted['영업개월수_구간_순위'] <= 2) & (delivery_change >= 20)
    ],
    [2, 1],
    default=0
)

df_sorted['배달의존도_안정성점수'] = stability_score

df_sorted[['가맹점ID', '기준년월', '배달비율변화_3M', '배달의존도급변점수', '배달의존도_안정성점수']].head(10)



## 7. 복합 위기 스코어 산출
- 각 지표를 0~1 범위로 스케일링한 후 가중치(0.3, 0.3, 0.2, 0.2)를 적용해 `위기총점`을 계산합니다.
- 총점을 기준으로 `위기등급`을 구간화합니다.



In [None]:

cancel_component = np.clip(
    (df_sorted['승인취소비율_구간_순위'].fillna(0) / 10) + (df_sorted['승인취소비율_급등지수'].fillna(0) / 5),
    0,
    1
)
loyalty_component = np.clip(-df_sorted['충성도_복합점수'].fillna(0) / 60, 0, 1)
rank_component = np.clip(df_sorted['순위급락강도점수'] / 2, 0, 1)

channel_spike = np.clip(df_sorted['배달의존도급변점수'] / 2, 0, 1)
channel_stability = np.clip(df_sorted['배달의존도_안정성점수'].fillna(0), 0, 1)
channel_component = np.clip(0.5 * channel_spike + 0.5 * channel_stability, 0, 1)

weights = {
    'cancel': 0.3,
    'loyalty': 0.3,
    'rank': 0.2,
    'channel': 0.2
}

df_sorted['위기총점'] = (
    weights['cancel'] * cancel_component +
    weights['loyalty'] * loyalty_component +
    weights['rank'] * rank_component +
    weights['channel'] * channel_component
)

df_sorted['위기등급'] = pd.cut(
    df_sorted['위기총점'],
    bins=[0, 0.4, 0.6, 1.0],
    labels=['관찰', '주의', '위험'],
    right=False
)

df_sorted[['가맹점ID', '기준년월', '위기총점', '위기등급']].head(10)



## 8. 요약 및 저장 예시
- 최근 6개월 위기 등급 분포를 확인합니다.
- 필요하다면 CSV로 내보낼 수 있도록 예시 코드를 제공합니다.



In [None]:

recent_mask = df_sorted['기준년월'] >= (df_sorted['기준년월'].max() - pd.DateOffset(months=5))
summary = (
    df_sorted[recent_mask]
    .groupby(['기준년월', '위기등급'])['가맹점ID']
    .nunique()
    .unstack(fill_value=0)
    .sort_index()
)
summary



In [None]:

# 저장 예시 (필요시 주석 해제)
# output_path = Path('risk_indicator_snapshot.csv')
# df_sorted.to_csv(output_path, index=False)
# print(f'saved: {output_path.resolve()}')

