# 02. 전처리 (Preprocessing)

SAPA 데이터의 품질 관리(QC)와 성격 척도 점수를 계산합니다.

## 학습 목표
- 응답 품질 관리 (QC) 방법 이해
- 역채점 처리 방법 이해
- Big Five, Ideology, Honesty-Humility 점수 계산

In [None]:
# 필요한 라이브러리 설치 (처음 한 번만 실행)
%pip install pandas numpy scipy -q

In [None]:
# 라이브러리 임포트
import pandas as pd
import numpy as np
from scipy import stats
import os

# 상위 폴더로 이동해서 데이터 접근
if os.path.basename(os.getcwd()) == 'notebooks':
    os.chdir('..')
print(f'작업 폴더: {os.getcwd()}')

## 1. 데이터 로드

In [None]:
# 데이터 로드
df = pd.read_csv('data/raw/sapa_data.csv')
keys = pd.read_csv('data/raw/superKey696.csv', index_col=0)

print(f'원본 데이터: {len(df):,}명')
print(f'채점 키: {len(keys)}개 문항, {len(keys.columns)}개 척도')

In [None]:
# 성격 문항 컬럼 추출
item_cols = [col for col in df.columns if col.startswith('q_')]
print(f'성격 문항: {len(item_cols)}개')

## 2. 품질 관리 (Quality Control)

### QC 기준
1. **응답 부족**: 10개 미만 응답자 제외
2. **Straight-lining**: 모든 응답이 동일한 경우 제외

In [None]:
# QC 1: 응답 부족 (10개 미만)
responses_per_person = df[item_cols].notna().sum(axis=1)
insufficient_response = responses_per_person < 10
print(f'응답 부족 (10개 미만): {insufficient_response.sum():,}명')

In [None]:
# QC 2: Straight-lining (모든 응답이 동일)
def check_straightlining(row):
    valid_responses = row.dropna()
    if len(valid_responses) < 2:
        return False
    return valid_responses.nunique() == 1

straightlining = df[item_cols].apply(check_straightlining, axis=1)
print(f'Straight-lining: {straightlining.sum():,}명')

In [None]:
# QC 적용
qc_exclude = insufficient_response | straightlining
df_clean = df[~qc_exclude].copy()

print(f'\n=== QC 결과 ===')
print(f'원본: {len(df):,}명')
print(f'제외: {qc_exclude.sum():,}명')
print(f'유효: {len(df_clean):,}명')

## 3. 척도 점수 계산 함수

In [None]:
def calculate_scale_score(df, keys, scale_name):
    """
    채점 키를 사용해 척도 점수 계산
    - 1: 정채점, -1: 역채점 (7 - 원점수), 0: 해당 없음
    """
    # 해당 척도에 속하는 문항 찾기
    scale_items = keys.index[keys[scale_name] != 0].tolist()
    weights = keys.loc[scale_items, scale_name]
    
    # 데이터에 있는 문항만 필터링
    available_items = [q for q in scale_items if q in df.columns]
    
    if not available_items:
        return pd.Series([np.nan] * len(df), index=df.index)
    
    # 역채점 적용 (6점 척도: 7 - 원점수)
    subset = df[available_items].copy()
    for item in available_items:
        if weights[item] == -1:
            subset[item] = 7 - subset[item]
    
    # 평균 계산 (결측 무시)
    return subset.mean(axis=1, skipna=True)

## 4. Big Five 점수 계산

In [None]:
# Big Five 점수 계산
scores = pd.DataFrame()
scores['RID'] = df_clean['RID']
scores['state'] = df_clean['state']  # 나중에 State 분석용

# NEO Big Five (약어 사용!)
scores['NEO_O'] = calculate_scale_score(df_clean, keys, 'NEO_O').values
scores['NEO_C'] = calculate_scale_score(df_clean, keys, 'NEO_C').values
scores['NEO_E'] = calculate_scale_score(df_clean, keys, 'NEO_E').values
scores['NEO_A'] = calculate_scale_score(df_clean, keys, 'NEO_A').values
scores['NEO_N'] = calculate_scale_score(df_clean, keys, 'NEO_N').values

print('Big Five 점수 계산 완료')

In [None]:
# Big Five 기술통계
big_five_cols = ['NEO_O', 'NEO_C', 'NEO_E', 'NEO_A', 'NEO_N']
big_five_names = ['Openness', 'Conscientiousness', 'Extraversion', 'Agreeableness', 'Neuroticism']

print('=== Big Five 기술통계 ===')
for col, name in zip(big_five_cols, big_five_names):
    n = scores[col].notna().sum()
    mean = scores[col].mean()
    sd = scores[col].std()
    print(f'{name}: N={n:,}, M={mean:.2f}, SD={sd:.2f}')

## 5. Ideology 점수 계산

**공식**: Ideology = mean( z(MPQ_Traditionalism), z(NEO_Liberalism) * -1 )

In [None]:
# Ideology 구성 요소 계산
scores['MPQ_Traditionalism'] = calculate_scale_score(df_clean, keys, 'MPQtr').values
scores['NEO_Liberalism'] = calculate_scale_score(df_clean, keys, 'NEOo6').values

# 유효한 값만 추출하여 z-score 계산 (index 유지 중요!)
valid_mask = scores['MPQ_Traditionalism'].notna() & scores['NEO_Liberalism'].notna()

if valid_mask.sum() > 0:
    mpqtr_valid = scores.loc[valid_mask, 'MPQ_Traditionalism']
    neo_lib_valid = scores.loc[valid_mask, 'NEO_Liberalism']
    
    # pd.Series로 변환하여 index 유지
    z_mpqtr = pd.Series(stats.zscore(mpqtr_valid.values), index=mpqtr_valid.index)
    z_neo_lib = pd.Series(stats.zscore(neo_lib_valid.values), index=neo_lib_valid.index)
    
    scores['Ideology'] = (z_mpqtr + z_neo_lib * -1) / 2

n = scores['Ideology'].notna().sum()
mean = scores['Ideology'].mean()
sd = scores['Ideology'].std()
print(f'Ideology: N={n:,}, M={mean:.2f}, SD={sd:.2f}')

## 6. Honesty-Humility 점수 계산

**공식**: H-H = mean( z(NEO_Morality[A2]), z(NEO_Modesty[A4]), z(HEXACO_H) )

In [None]:
# Honesty-Humility 구성 요소 계산
scores['NEO_Morality'] = calculate_scale_score(df_clean, keys, 'NEOa2').values
scores['NEO_Modesty'] = calculate_scale_score(df_clean, keys, 'NEOa4').values
scores['HEXACO_H'] = calculate_scale_score(df_clean, keys, 'HEXACO_H').values

# 세 구성 요소가 모두 유효한 경우에만 계산
valid_mask = (scores['NEO_Morality'].notna() & 
              scores['NEO_Modesty'].notna() & 
              scores['HEXACO_H'].notna())

if valid_mask.sum() > 0:
    neo_a2_valid = scores.loc[valid_mask, 'NEO_Morality']
    neo_a4_valid = scores.loc[valid_mask, 'NEO_Modesty']
    hexaco_h_valid = scores.loc[valid_mask, 'HEXACO_H']
    
    z_neo_a2 = pd.Series(stats.zscore(neo_a2_valid.values), index=neo_a2_valid.index)
    z_neo_a4 = pd.Series(stats.zscore(neo_a4_valid.values), index=neo_a4_valid.index)
    z_hexaco_h = pd.Series(stats.zscore(hexaco_h_valid.values), index=hexaco_h_valid.index)
    
    scores['Honesty_Humility'] = (z_neo_a2 + z_neo_a4 + z_hexaco_h) / 3

n = scores['Honesty_Humility'].notna().sum()
mean = scores['Honesty_Humility'].mean()
sd = scores['Honesty_Humility'].std()
print(f'Honesty-Humility: N={n:,}, M={mean:.2f}, SD={sd:.2f}')

## 7. 결과 저장

In [None]:
# processed 폴더 생성
os.makedirs('data/processed', exist_ok=True)

# 점수 저장
scores.to_csv('data/processed/sapa_scores.csv', index=False)
print(f'저장 완료: data/processed/sapa_scores.csv')
print(f'저장된 응답자 수: {len(scores):,}명')
print(f'저장된 컬럼: {list(scores.columns)}')

## 8. 최종 요약

In [None]:
print('=== 전처리 완료 요약 ===')
print(f'\n[QC 결과]')
print(f'원본: {len(df):,}명')
print(f'제외: {qc_exclude.sum():,}명 (응답부족: {insufficient_response.sum()}, Straight-lining: {straightlining.sum()})')
print(f'유효: {len(df_clean):,}명')

print(f'\n[척도별 기술통계]')
all_scales = ['NEO_O', 'NEO_C', 'NEO_E', 'NEO_A', 'NEO_N', 'Ideology', 'Honesty_Humility']
all_names = ['Openness', 'Conscientiousness', 'Extraversion', 'Agreeableness', 'Neuroticism', 'Ideology', 'Honesty-Humility']

for col, name in zip(all_scales, all_names):
    n = scores[col].notna().sum()
    mean = scores[col].mean()
    sd = scores[col].std()
    print(f'{name}: N={n:,}, M={mean:.2f}, SD={sd:.2f}')