In [None]:
# -*- coding: utf-8 -*-
"""
ILI (주간, 시즌별 wide 형태) + 백신 접종률(시즌별) + 급성호흡기감염증 중 인플루엔자 환자수(연도/주차)
+ 주간 기상(yy년 ww주)을 병합하는 스크립트.

- weather.csv의 날짜 형식: "yy년 ww주" (예: "15년 03주")
- yy는 2자리 연도이므로 00~69 -> 2000~2069, 70~99 -> 1970~1999으로 해석
- 최종 데이터 범위: 2015년 1주차 ~ 2025년 26주차 만 남김
- ILI 시즌: "해당년도 36주" ~ "다음년도 35주" (시즌 주차 → 캘린더 주차로 변환)
"""

import pandas as pd
import numpy as np
import re
from datetime import date

ILI_PATH = "data/raw/influenza_ili.csv"
VAX_PATH = "data/raw/vaccine.csv"
RESP_PATH = "data/raw/respiratory.csv"
WEATHER_PATH = "data/raw/weather.csv"

# -------------------------
# 공통 유틸
# -------------------------
def read_csv_try(path: str) -> pd.DataFrame:
    for enc in ["euc-kr", "cp949", "utf-8-sig", "utf-8"]:
        try:
            return pd.read_csv(path, encoding=enc)
        except Exception:
            continue
    return pd.read_csv(path, encoding="utf-8", errors="replace")

def read_csv_try_headerless(path: str) -> pd.DataFrame:
    for enc in ["euc-kr", "cp949", "utf-8-sig", "utf-8"]:
        try:
            return pd.read_csv(path, header=None, encoding=enc)
        except Exception:
            continue
    return pd.read_csv(path, header=None, encoding="utf-8", errors="replace")

def parse_season_years(season_str: str):
    m = re.match(r"^\s*(\d{4})\s*-\s*(\d{4})\s*$", str(season_str))
    if not m:
        return None, None
    return int(m.group(1)), int(m.group(2))

def iso_weeks_in_year(year: int) -> int:
    # ISO 주차의 마지막 주 번호 (52 또는 53)
    return date(year, 12, 28).isocalendar().week

# -------------------------
# ILI (wide -> long)  [★ 시즌 주차 → 캘린더 주차 반영]
# -------------------------
def load_ili_long(path: str) -> pd.DataFrame:
    df = read_csv_try_headerless(path).copy()
    week_cols = [c for c in df.columns if c != 0]

    # 우측 완전결측 열 제거
    while len(week_cols) and df[week_cols[-1]].isna().all():
        week_cols.pop()

    # 숫자형 변환 + 중앙값 보간
    for c in week_cols:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    # [변경점] 시즌(행) 단위 선형보간 (주차가 좌→우로 증가한다고 가정)
    # - limit_direction='both' 로 양 끝단도 외삽(연장) 채움
    df[week_cols] = df[week_cols].interpolate(
        axis=1, method="linear", limit_direction="both"
    )

    out = df.melt(id_vars=[0], value_vars=week_cols,
                  var_name="week_idx", value_name="ili")
    out = out.rename(columns={0: "season"})
    out["season_norm"] = (
        out["season"].astype(str).str.replace("절기", "", regex=False).str.strip()
    )

    # 시즌 내부 순서(1,2,...) 부여 → 이것은 "시즌 주차"
    out = out.sort_values(["season_norm", "week_idx"])
    out["season_week"] = out.groupby("season_norm").cumcount() + 1

    # 시즌 주차 → 캘린더 주차(ISO)로 변환
    #   시즌 Y-(Y+1):
    #     시즌 1주  → Y년 36주
    #     시즌 k주  → Y년 36주 + (k-1) 진행,
    #                 단, Y년 마지막 ISO 주를 넘으면 (Y+1)년 1주부터 이어감
    def map_to_calendar_week(row):
        s = row["season_norm"]
        sw = int(row["season_week"])
        start_y, end_y = parse_season_years(s)
        if start_y is None:
            return np.nan, np.nan

        weeks_start = iso_weeks_in_year(start_y)  # 52 또는 53
        first_segment_len = weeks_start - 35      # 36..마지막주 (예: 17 또는 18)

        if sw <= first_segment_len:
            # 시작년도 구간(36..52/53)
            year = start_y
            week = 35 + sw
        else:
            # 다음년도 구간(1..35)
            year = end_y
            week = sw - first_segment_len
        return year, week

    mapped = out.apply(lambda r: map_to_calendar_week(r), axis=1)
    out["year"] = [m[0] for m in mapped]
    out["week"] = [m[1] for m in mapped]

    # 라벨 및 정리
    out["label"] = out["season_norm"] + " season - W" + out["week"].astype(int).astype(str)
    return out[["season_norm", "week", "label", "ili"]].reset_index(drop=True)

# -------------------------
# 백신 (시즌별)
# -------------------------
def extract_vax_seasonal_rate(df_raw: pd.DataFrame) -> pd.DataFrame:
    cand_cols = [c for c in df_raw.columns if isinstance(c, str) and ("접종률" in c)]
    if not cand_cols:
        cand_cols = [c for c in df_raw.columns if isinstance(c, str) and ("절기" in c)]
    if not cand_cols:
        return pd.DataFrame(columns=["season_norm","vaccine_rate"])

    season_map = {}
    for c in cand_cols:
        season = None
        m = re.search(r"(\d{4})\s*-\s*(\d{4})", c)
        if m:
            season = f"{m.group(1)}-{m.group(2)}"
        elif "절기" in c:
            season = c.split("절기")[0].strip()
        if season:
            season_map.setdefault(season, []).append(c)

    out = {}
    for season, cols in season_map.items():
        sub = df_raw[cols].apply(pd.to_numeric, errors="coerce")
        row_mean = sub.mean(axis=1, skipna=True)
        val = float(row_mean.mean(skipna=True))
        out[season] = val

    return pd.DataFrame({"season_norm": list(out.keys()),
                         "vaccine_rate": list(out.values())})

# -------------------------
# 호흡기 환자수 (year-week → season_norm)
# -------------------------
def load_respiratory(path: str) -> pd.DataFrame:
    df = read_csv_try(path).copy()
    df = df.iloc[:, :3]
    df.columns = ["year", "week", "case_count"]
    df["year"] = pd.to_numeric(df["year"], errors="coerce")
    df["week"] = pd.to_numeric(df["week"], errors="coerce")

    df["season_norm"] = df["year"].astype(int).astype(str) + "-" + (df["year"]+1).astype(int).astype(str)
    df["label"] = df["season_norm"] + " season - W" + df["week"].astype(int).astype(str)
    return df[["season_norm","week","label","case_count"]]

# -------------------------
# 날씨 (yy년 ww주 → year/week → season_norm)
# -------------------------
YY_WW_PATTERN = re.compile(r"^\s*(\d{2})\s*년\s*(\d{1,2})\s*주\s*$")

def _yy_to_yyyy(yy: int) -> int:
    return 1900 + yy if yy >= 70 else 2000 + yy

def load_weather(path: str) -> pd.DataFrame:
    df = read_csv_try(path).copy()

    date_col = None
    for c in df.columns:
        sample = df[c].astype(str).dropna().head(20)
        if sample.apply(lambda s: bool(YY_WW_PATTERN.match(s))).any():
            date_col = c
            break
    if date_col is None:
        df2 = read_csv_try_headerless(path).copy()
        for c in df2.columns:
            sample = df2[c].astype(str).dropna().head(20)
            if sample.apply(lambda s: bool(YY_WW_PATTERN.match(s))).any():
                df2 = df2.rename(columns={c: "date_text"})
                df = df2
                date_col = "date_text"
                break
        if date_col is None:
            raise ValueError("weather.csv에서 'yy년 ww주' 형식의 날짜 컬럼을 찾지 못했습니다.")

    def parse_yy_ww(s: str):
        m = YY_WW_PATTERN.match(str(s))
        if not m:
            return np.nan, np.nan
        yy = int(m.group(1))
        ww = int(m.group(2))
        yyyy = _yy_to_yyyy(yy)
        return yyyy, ww

    year_week = df[date_col].apply(parse_yy_ww)
    df["year"] = [yw[0] for yw in year_week]
    df["week"] = [yw[1] for yw in year_week]
    df = df.dropna(subset=["year","week"])
    df["year"] = df["year"].astype(int)
    df["week"] = df["week"].astype(int)

    df["season_norm"] = df["year"].astype(str) + "-" + (df["year"]+1).astype(str)
    df["label"] = df["season_norm"] + " season - W" + df["week"].astype(str)

    key_cols = {"season_norm","week","label","year"}
    value_cols = [c for c in df.columns if c not in key_cols]
    for c in value_cols:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    values_numeric = [c for c in value_cols if c not in (date_col,)]
    rename_map = {c: f"wx_{c}" for c in values_numeric if not c.startswith("wx_")}
    df = df[["season_norm","week","label"] + values_numeric].rename(columns=rename_map)

    wx_cols = [c for c in df.columns if c.startswith("wx_")]
    df = df[["season_norm","week","label"] + wx_cols]

    for c in wx_cols:
        if df[c].isna().any():
            df[c] = df[c].fillna(df[c].median(skipna=True))

    return df

# -------------------------
# 시간 정렬 키
# -------------------------
def add_time_order(df: pd.DataFrame) -> pd.DataFrame:
    if "week" in df.columns:
        df["week"] = pd.to_numeric(df["week"], errors="coerce").astype("Int64").astype(int)
    years = df["season_norm"].apply(parse_season_years)
    df["start_year"] = [y[0] for y in years]
    df["end_year"] = [y[1] for y in years]
    df["tidx"] = (df["start_year"]*10000) + (df["end_year"]*100) + df["week"]
    return df

# -------------------------
# 메인 병합
# -------------------------
def main():
    ili_long = load_ili_long(ILI_PATH)          # ★ ILI 주차가 캘린더 주로 변환됨
    vax_raw = read_csv_try(VAX_PATH)
    vax_season = extract_vax_seasonal_rate(vax_raw)
    resp_df = load_respiratory(RESP_PATH)
    wx_df = load_weather(WEATHER_PATH)

    # ILI + VAX + RESP
    merged = pd.merge(ili_long, vax_season, on="season_norm", how="left")
    merged = pd.merge(merged, resp_df, on=["season_norm","week","label"], how="left")

    # + WEATHER
    merged = pd.merge(merged, wx_df, on=["season_norm","week","label"], how="left")

    merged = add_time_order(merged)
    merged = merged.sort_values(["start_year","end_year","week"]).reset_index(drop=True)

    # 컷오프: 2015-2016 W1 ~ 2025-2026 W26
    start_key = (2015 * 10000) + (2016 * 100) + 1
    end_key   = (2025 * 10000) + (2026 * 100) + 26
    merged = merged[(merged["tidx"] >= start_key) & (merged["tidx"] <= end_key)].copy()

    # 숫자형 보장
    for col in ["ili","vaccine_rate","case_count"]:
        if col in merged.columns:
            merged[col] = pd.to_numeric(merged[col], errors="coerce")

    # wx_* 결측 채움
    wx_cols = [c for c in merged.columns if c.startswith("wx_")]
    for c in wx_cols:
        if merged[c].isna().any():
            merged[c] = merged[c].fillna(merged[c].median(skipna=True))

    # 핵심 컬럼 결측 채움
    for col in ["ili","vaccine_rate","case_count"]:
        if col in merged.columns and merged[col].isna().any():
            merged[col] = merged[col].fillna(merged[col].median(skipna=True))

    # 라벨 정합성
    merged["label"] = merged["season_norm"].astype(str) + " season - W" + merged["week"].astype(int).astype(str)

    # 저장
    save_cols = ["season_norm","week","label","ili","vaccine_rate","case_count"] + wx_cols
    merged_sorted = merged.sort_values(["start_year","end_year","week"]).reset_index(drop=True)
    merged_sorted[save_cols].to_csv(
        "data/processed/3_merged_influenza_vaccine_respiratory_weather.csv", index=False, encoding="utf-8-sig"
    )
    merged_sorted.to_csv(
        "data/processed/3_merged_influenza_vaccine_respiratory_weather_filled.csv", index=False, encoding="utf-8-sig"
    )

    print("저장 완료 -> data/processed/3_merged_influenza_vaccine_respiratory_weather.csv")
    print("저장 완료 -> data/processed/3_merged_influenza_vaccine_respiratory_weather_filled.csv")

if __name__ == "__main__":
    main()

저장 완료 -> 3_merged_influenza_vaccine_respiratory_weather.csv
저장 완료 -> 3_merged_influenza_vaccine_respiratory_weather_filled.csv
