In [None]:
import missingno as msno

In [51]:
import pandas as pd
import numpy as np

# 1. 파일 경로
env_path = "/content/서울특별시_국공립_초중고_환경위생관리현황.csv"
info_path = "/content/서울시_국공립_학교기본정보.csv"

# 2. 데이터 로드
env_df = pd.read_csv(env_path)
info_df = pd.read_csv(info_path)

print(f"환경위생 데이터 shape: {env_df.shape}")
print(f"학교정보 데이터 shape: {info_df.shape}")

union_school_code = set(env_df['SCHUL_CODE'].unique()) & set(info_df['SCHUL_CODE'].unique())

env_df = env_df[env_df['SCHUL_CODE'].isin(union_school_code)]
info_df = info_df[info_df['SCHUL_CODE'].isin(union_school_code)]

print(f"교집합 데이터 shape: {env_df.shape}")
print(f"교집합 데이터 shape: {info_df.shape}")

# 3. 설립연도 병합 (공시년도로 사용)
if 'SCHUL_CODE' in env_df.columns and 'SCHUL_CODE' in info_df.columns:
    env_df = pd.merge(env_df, info_df[['SCHUL_CODE', 'FOND_YMD']], on='SCHUL_CODE', how='left')
    env_df.rename(columns={'FOND_YMD': '공시년도'}, inplace=True)
    env_df['공시년도'] = env_df['공시년도'].apply(lambda x : int(str(x)[:4]) if x!=np.nan else np.nan) # pd.to_datetime(env_df['공시년도'], errors='coerce').dt.year
    print("✅ 설립연도(공시년도) 병합 완료")

# 4. 기준값 및 가중치 정의 (실제 컬럼명 기준)
standard_dict = {
    # 조도 관련
    'BLKB_ITENI': 300,  # 칠판면 조도
    'DES_ITENI': 300,   # 책상면 조도
    'ITENI': 3,         # 조도비 (최대/최소)

    # 소음
    'NSE': 55,          # 소음

    # 미세먼지
    'MNUT_DST': 75,     # 미세먼지 PM10 (일반)
    'GMNSM_MNUT_DST': 150,  # 미세먼지 PM10 (황사시)
    'ULTRA_DST': 35,    # 미세먼지 PM2.5

    # 화학물질
    'HCHO': 100,        # 폼알데하이드
    'CO2_1500': 1500,   # 이산화탄소 1500ppm
    'CO2_1000': 1000,   # 이산화탄소 1000ppm
    'AIR_BACT': 800,    # 총부유세균
    'RN': 148,          # 라돈
    'O3': 0.06,         # 오존
    'VOCS': 400,        # 총휘발성유기화합물
    'BENZENE': 30,      # 벤젠
    'TOLUENE': 1000,    # 톨루엔
    'ETHY_BENZENE': 360, # 에틸벤젠
    'XYLENE': 700,      # 자일렌
    'STYLENE': 300,     # 스티렌
    'ASBESTOS': 0.01,   # 석면
    'CO': 10,           # 일산화탄소
    'NO2': 0.05,        # 이산화질소
    'FALL_BACT': 800,   # 낙하세균
    'MITE': 100         # 진드기
}

# 가중치 정의 (1급 발암물질, 2급 발암물질, 기타)
weight_dict = {
    # 1급 발암물질
    'ASBESTOS': 3.0,    # 석면
    'RN': 3.0,          # 라돈
    'HCHO': 3.0,        # 폼알데하이드
    'BENZENE': 3.0,     # 벤젠

    # 2급 발암물질
    'TOLUENE': 2.0,     # 톨루엔
    'ETHY_BENZENE': 2.0, # 에틸벤젠
    'XYLENE': 2.0,      # 자일렌
    'STYLENE': 2.0,     # 스티렌

    # 공기 관련 물질
    'VOCS': 1.5,        # 총휘발성유기화합물
    'MNUT_DST': 1.0,    # 미세먼지
    'GMNSM_MNUT_DST': 1.0,
    'ULTRA_DST': 1.0,
    'CO': 1.0,          # 일산화탄소
    'NO2': 1.0,         # 이산화질소
    'O3': 1.0,          # 오존
    'CO2_1500': 1.0,    # 이산화탄소
    'CO2_1000': 1.0,
    'AIR_BACT': 1.0,    # 세균
    'FALL_BACT': 1.0,

    # 기타
    'BLKB_ITENI': 0.5,  # 조도
    'DES_ITENI': 0.5,
    'ITENI': 0.5,
    'NSE': 1.0,         # 소음
    'MITE': 1.0         # 진드기
}

# 등급 구간 정의
def get_grade(total_score):
    if total_score <= 0.4:
        return 'A'
    elif total_score <= 0.5:
        return 'B'
    elif total_score <= 0.55:
        return 'C'
    elif total_score <= 0.75:
        return 'D'
    else:
        return 'E'

# 5. 측정값 컬럼 탐색 및 그룹화
print("\n=== 측정값 컬럼 그룹화 ===")
material_groups = {}

for prefix in standard_dict.keys():
    # 1차 결과치와 최종 결과치 컬럼 찾기
    rslt_1 = f"{prefix}_RSLT_NMVL"
    rslt_2 = f"{prefix}_RSLT_NMVL_2"

    available_cols = []
    if rslt_1 in env_df.columns:
        available_cols.append(rslt_1)
    if rslt_2 in env_df.columns:
        available_cols.append(rslt_2)

    if available_cols:
        material_groups[prefix] = available_cols
        print(f"{prefix}: {available_cols}")

print(f"\n총 {len(material_groups)}개 물질 그룹 발견")

# 6. 텍스트 정제 및 수치 변환 함수
def clean_numeric_value(value):
    if pd.isna(value):
        return np.nan

    value_str = str(value).strip()

    # "해당없음" → 0
    if value_str in ['해당없음', '해당 없음', '해당사항없음']:
        return 0

    # "미실시", "정보없음" → NaN
    if value_str in ['미실시', '정보없음', '정보 없음', '', '-']:
        return np.nan

    # 숫자 추출
    try:
        return float(value_str)
    except:
        return np.nan

# 7. 물질별 최대값 계산 (1차, 최종 결과치 중 MAX)
print("\n=== 물질별 최대값 계산 ===")
for material, cols in material_groups.items():
    # 텍스트 정제
    for col in cols:
        env_df[col] = env_df[col].apply(clean_numeric_value)

    # 최대값 계산
    env_df[f"{material}_MAX"] = env_df[cols].max(axis=1)
    print(f"{material}: {len(cols)}개 컬럼 → MAX 계산 완료")

# 8. 공기관련/건물관련 물질 분류
air_related = ['MNUT_DST', 'GMNSM_MNUT_DST', 'ULTRA_DST', 'CO', 'NO2', 'O3',
               'CO2_1500', 'CO2_1000', 'AIR_BACT', 'FALL_BACT', 'VOCS']

building_related = ['ASBESTOS', 'RN', 'HCHO', 'BENZENE', 'TOLUENE',
                   'ETHY_BENZENE', 'XYLENE', 'STYLENE',
                    'BLKB_ITENI', 'DES_ITENI', 'ITENI', 'NSE', 'MITE']

# 실제 존재하는 컬럼만 필터링
air_related = [mat for mat in air_related if mat in material_groups]
building_related = [mat for mat in building_related if mat in material_groups]

print(f"\n공기관련 물질: {air_related}")
print(f"건물관련 물질: {building_related}")

# 9. 개선된 결측치 처리
print("\n=== 결측치 처리 ===")

# 공기관련 → 자치구 평균으로 대체
if 'ADRCD_NM' in env_df.columns:
    for material in air_related:
        col_name = f"{material}_MAX"
        if col_name in env_df.columns:
            before_null = env_df[col_name].isnull().sum()
            env_df[col_name] = env_df.groupby('ADRCD_NM')[col_name].transform(
                lambda x: x.fillna(x.mean())
            )
            after_null = env_df[col_name].isnull().sum()
            print(f"{material}: {before_null} → {after_null} (자치구 평균)")

# 건물관련 → 공시년도 평균 → 전체 평균 (2단계 처리)
if '공시년도' in env_df.columns:
    env_df['공시년도_그룹'] = (env_df['공시년도'] // 10) * 10  # 10년 단위 그룹

    for material in building_related:
        col_name = f"{material}_MAX"
        if col_name in env_df.columns:
            before_null = env_df[col_name].isnull().sum()

            # 1단계: 공시년도 그룹 평균으로 대체
            env_df[col_name] = env_df.groupby('공시년도_그룹')[col_name].transform(
                lambda x: x.fillna(x.mean())
            )
            middle_null = env_df[col_name].isnull().sum()

            # 2단계: 여전히 결측치인 경우 전체 평균으로 대체
            if middle_null > 0:
                overall_mean = env_df[col_name].mean()
                env_df[col_name] = env_df[col_name].fillna(overall_mean)
                after_null = env_df[col_name].isnull().sum()

                print(f"{material}: {before_null} → {middle_null} (공시년도 평균) → {after_null} (전체 평균)")
            else:
                print(f"{material}: {before_null} → {middle_null} (공시년도 평균)")

# 10. 최종 결측치 확인
print("\n=== 최종 결측치 확인 ===")
final_null_check = []
for material in material_groups.keys():
    col_name = f"{material}_MAX"
    if col_name in env_df.columns:
        null_count = env_df[col_name].isnull().sum()
        if null_count > 0:
            final_null_check.append(f"{material}: {null_count}개")

if final_null_check:
    print("⚠️ 여전히 결측치가 남은 항목:")
    for item in final_null_check:
        print(f"  - {item}")
else:
    print("✅ 모든 물질 항목 결측치 처리 완료")

# 11. DataFrame 조각화 방지를 위한 점수 계산 최적화
print("\n=== 점수 계산 ===")

def calculate_risk_score(value, standard):
    """
    기준값 대비 위험도 점수 계산
    - 기준과 동일 = 1
    - 기준 넘어가면 = 넘어가는 만큼 1이상
    - 기준보다 낮으면 = 낮은만큼 1이하
    """
    if pd.isna(value) or standard == 0:
        return 0
    return value / standard

# 점수 계산을 위한 새로운 DataFrame 생성 (조각화 방지)
score_data = {}
weighted_data = {}

for material in material_groups.keys():
    if material in standard_dict:
        max_col = f"{material}_MAX"

        if max_col in env_df.columns:
            # 기본 점수 계산
            scores = env_df[max_col].apply(
                lambda x: calculate_risk_score(x, standard_dict[material])
            )
            score_data[f"{material}_SCORE"] = scores

            # 가중치 적용
            weight = weight_dict.get(material, 1.0)
            weighted_data[f"{material}_WEIGHTED"] = scores * weight

            print(f"{material}: 기준값={standard_dict[material]}, 가중치={weight}")

# 한 번에 DataFrame에 추가
score_df = pd.DataFrame(score_data)
weighted_df = pd.DataFrame(weighted_data)

# 원본 DataFrame과 병합
env_df = pd.concat([env_df, score_df, weighted_df], axis=1)

# 12. 총점 및 등급 계산
weighted_score_cols = list(weighted_data.keys())
env_df['TOTAL_WEIGHTED_SCORE'] = env_df[weighted_score_cols].mean(axis=1) # 총점 평균 내기
env_df['SAFETY_GRADE'] = env_df['TOTAL_WEIGHTED_SCORE'].apply(get_grade)

print(f"\n총 {len(weighted_score_cols)}개 물질의 가중 점수 합계 계산 완료")

# 13. 등급별 분포 확인
grade_dist = env_df['SAFETY_GRADE'].value_counts().sort_index()
print(f"\n=== 안전등급 분포 ===")
for grade, count in grade_dist.items():
    percentage = (count / len(env_df)) * 100
    print(f"{grade}등급: {count}개교 ({percentage:.1f}%)")

# 14. 결과 데이터 구성
meta_cols = ['SCHUL_CODE', 'SCHUL_NM', 'ADRCD_NM', '공시년도']
max_cols = [f"{mat}_MAX" for mat in material_groups.keys()]
score_cols_final = list(score_data.keys())
weighted_cols_final = list(weighted_data.keys())

output_cols = (
    [col for col in meta_cols if col in env_df.columns] +
    max_cols +
    score_cols_final +
    weighted_cols_final +
    ['TOTAL_WEIGHTED_SCORE', 'SAFETY_GRADE']
)

# 15. 최종 결과 저장
final_df = env_df[output_cols]
output_path = "/content/환경위생_통합점수_최종처리_개선.csv"
final_df.to_csv(output_path, index=False, encoding='utf-8-sig')

print(f"\n✅ 최종 결과 저장 완료: {output_path}")
print(f"최종 데이터 shape: {final_df.shape}")

# 16. 결과 요약 출력
print(f"\n=== 처리 완료 요약 ===")
print(f"• 총 처리 학교 수: {len(final_df)}개교")
print(f"• 총 측정 물질 수: {len(material_groups)}개")
print(f"• 1급 발암물질: {len([m for m in building_related if weight_dict.get(m, 0) >= 3.0])}개")
print(f"• 2급 발암물질: {len([m for m in building_related if weight_dict.get(m, 0) == 2.0])}개")
print(f"• 공기관련 물질: {len(air_related)}개")

# 상위/하위 5개교 출력
print(f"\n=== 안전도 상위 5개교 ===")
top_5 = final_df.nsmallest(5, 'TOTAL_WEIGHTED_SCORE')[['SCHUL_NM', 'ADRCD_NM', 'TOTAL_WEIGHTED_SCORE', 'SAFETY_GRADE']]
for idx, row in top_5.iterrows():
    print(f"{row['SCHUL_NM']} ({row['ADRCD_NM']}): {row['TOTAL_WEIGHTED_SCORE']:.2f}점 ({row['SAFETY_GRADE']}등급)")

print(f"\n=== 안전도 하위 5개교 ===")
bottom_5 = final_df.nlargest(5, 'TOTAL_WEIGHTED_SCORE')[['SCHUL_NM', 'ADRCD_NM', 'TOTAL_WEIGHTED_SCORE', 'SAFETY_GRADE']]
for idx, row in bottom_5.iterrows():
    print(f"{row['SCHUL_NM']} ({row['ADRCD_NM']}): {row['TOTAL_WEIGHTED_SCORE']:.2f}점 ({row['SAFETY_GRADE']}등급)")

환경위생 데이터 shape: (3860, 139)
학교정보 데이터 shape: (970, 39)
교집합 데이터 shape: (3832, 139)
교집합 데이터 shape: (962, 39)
✅ 설립연도(공시년도) 병합 완료

=== 측정값 컬럼 그룹화 ===
BLKB_ITENI: ['BLKB_ITENI_RSLT_NMVL']
DES_ITENI: ['DES_ITENI_RSLT_NMVL']
ITENI: ['ITENI_RSLT_NMVL']
NSE: ['NSE_RSLT_NMVL']
MNUT_DST: ['MNUT_DST_RSLT_NMVL', 'MNUT_DST_RSLT_NMVL_2']
GMNSM_MNUT_DST: ['GMNSM_MNUT_DST_RSLT_NMVL', 'GMNSM_MNUT_DST_RSLT_NMVL_2']
ULTRA_DST: ['ULTRA_DST_RSLT_NMVL', 'ULTRA_DST_RSLT_NMVL_2']
HCHO: ['HCHO_RSLT_NMVL', 'HCHO_RSLT_NMVL_2']
AIR_BACT: ['AIR_BACT_RSLT_NMVL', 'AIR_BACT_RSLT_NMVL_2']
RN: ['RN_RSLT_NMVL', 'RN_RSLT_NMVL_2']
O3: ['O3_RSLT_NMVL', 'O3_RSLT_NMVL_2']
VOCS: ['VOCS_RSLT_NMVL', 'VOCS_RSLT_NMVL_2']
BENZENE: ['BENZENE_RSLT_NMVL', 'BENZENE_RSLT_NMVL_2']
TOLUENE: ['TOLUENE_RSLT_NMVL', 'TOLUENE_RSLT_NMVL_2']
ETHY_BENZENE: ['ETHY_BENZENE_RSLT_NMVL', 'ETHY_BENZENE_RSLT_NMVL_2']
XYLENE: ['XYLENE_RSLT_NMVL', 'XYLENE_RSLT_NMVL_2']
STYLENE: ['STYLENE_RSLT_NMVL', 'STYLENE_RSLT_NMVL_2']
ASBESTOS: ['ASBESTOS_RSLT_NMVL', '

In [55]:
final_df['SAFETY_GRADE'].value_counts()

Unnamed: 0_level_0,count
SAFETY_GRADE,Unnamed: 1_level_1
B,1662
A,1610
C,327
D,192
E,41


In [49]:
final_df['TOTAL_WEIGHTED_SCORE'].quantile(0.5)

np.float64(0.4150791452328789)

In [46]:
final_df['TOTAL_WEIGHTED_SCORE'].quantile(0.9)

np.float64(0.5208612932417229)

In [47]:
zfinal_df['TOTAL_WEIGHTED_SCORE'].quantile(0.95)

np.float64(0.5585477560840006)

In [48]:
final_df['TOTAL_WEIGHTED_SCORE'].quantile(0.99)

np.float64(0.7546408269345916)