In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
MLflow-친화 파이프라인 (v4, 더 개선판)
- 문제점 수정:
  1) 미래 피처 생성 가능한 컬럼만 사용 (화이트리스트)
  2) 재귀 예측 시, 예측값으로 래그/롤링/증감/pct를 동적으로 갱신
- 구성: EDA -> FE -> 모델들(Linear/RF/(XGB)) 학습/검증/미래예측
"""

from pathlib import Path
import os, warnings, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")
os.environ.setdefault("GIT_PYTHON_REFRESH", "quiet")

# ---------------- MLflow 전역 임포트 ----------------
import mlflow
import mlflow.data
try:
    import mlflow.sklearn as mlflow_sklearn
except Exception:
    mlflow_sklearn = None

# autolog는 입력예시만 남기고 모델은 수동 로깅(속도 개선)
try:
    if mlflow_sklearn is not None:
        mlflow_sklearn.autolog(log_models=False, log_input_examples=True, silent=True)
except Exception:
    pass

# ---------------- 경로/설정 ----------------
PROC = Path("/root/covid_processed.csv")
FEAT = Path("/root/covid_features.csv")
OUTDIR = Path("/root")
TARGET = "new_cases"            # 필요시 변경
HORIZON = 30                    # 미래 예측일 수
TEST_DAYS = 60                  # 고정 테스트 구간(최근 60일)
LOOKBACKS = [1, 7, 14]          # 래그
ROLLS = [7, 14, 28]             # 롤링
EXPERIMENT_NAME = "covid_timeseries_prophet_lstm"

mlflow.set_experiment(EXPERIMENT_NAME)

# ---------------- 로깅 유틸 ----------------
def log_df_preview_md(df: pd.DataFrame, name: str, n: int = 20):
    mlflow.log_text(df.head(n).to_markdown(index=False), f"{name}_preview.md")

def lineplot(dates, series, title, fname):
    fig = plt.figure()
    plt.plot(dates, series)
    plt.title(title); plt.xlabel("date"); plt.ylabel("value")
    mlflow.log_figure(fig, fname); plt.close(fig)

def dualplot(dates, y_true, y_pred, title, fname):
    fig = plt.figure()
    plt.plot(dates, y_true, label="actual")
    plt.plot(dates, y_pred, label="pred")
    plt.legend(); plt.title(title); plt.xlabel("date"); plt.ylabel("value")
    mlflow.log_figure(fig, fname); plt.close(fig)

def residplot(y_true, y_pred, title, fname):
    res = np.array(y_true) - np.array(y_pred)
    fig = plt.figure()
    plt.scatter(y_pred, res, s=8)
    plt.axhline(0, color="gray"); plt.title(title)
    plt.xlabel("pred"); plt.ylabel("residuals")
    mlflow.log_figure(fig, fname); plt.close(fig)

def corr_heatmap(df: pd.DataFrame, fname="eda_corr_heatmap.png", max_cols=30):
    try:
        import seaborn as sns
        cols = df.select_dtypes(include=[np.number]).columns.tolist()[:max_cols]
        if len(cols) >= 2:
            fig = plt.figure(figsize=(6,5))
            sns.heatmap(df[cols].corr(), cmap="coolwarm", center=0)
            plt.title("Correlation (numeric, head)")
            mlflow.log_figure(fig, fname); plt.close(fig)
    except Exception:
        pass

# ---------------- FE ----------------
def add_time_features(df: pd.DataFrame, date_col="date"):
    if date_col not in df.columns: 
        return df
    df["date"] = pd.to_datetime(df[date_col], errors="coerce")
    df["dow"] = df["date"].dt.dayofweek
    df["weekofyear"] = df["date"].dt.isocalendar().week.astype(int)
    df["dayofyear"] = df["date"].dt.dayofyear
    df["dow_sin"] = np.sin(2*np.pi*df["dow"]/7); df["dow_cos"] = np.cos(2*np.pi*df["dow"]/7)
    if "month" in df.columns:
        df["month_sin"] = np.sin(2*np.pi*df["month"]/12); df["month_cos"] = np.cos(2*np.pi*df["month"]/12)
    return df

def add_lag_roll(df: pd.DataFrame, target: str, lags, rolls):
    df = df.sort_values("date").reset_index(drop=True)
    for l in lags:
        df[f"{target}_lag{l}"] = df[target].shift(l)
    for w in rolls:
        df[f"{target}_rollmean{w}"] = df[target].shift(1).rolling(w, min_periods=1).mean()
        df[f"{target}_rollstd{w}"]  = df[target].shift(1).rolling(w, min_periods=1).std()
    df[f"{target}_diff1"] = df[target].diff(1)
    df[f"{target}_pct"]   = df[target].pct_change().replace([np.inf, -np.inf], np.nan)
    df = df.bfill().ffill()
    return df

def feature_engineer(in_path: Path, target: str, out_path: Path) -> pd.DataFrame:
    df = pd.read_csv(in_path)
    if "date" not in df.columns: 
        raise ValueError("Need 'date' column.")
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df = add_time_features(df)
    df = add_lag_roll(df, target, LOOKBACKS, ROLLS)
    num_cols = df.select_dtypes(include=[np.number]).columns
    df[num_cols] = df[num_cols].interpolate("linear", limit_direction="both")
    df = df.bfill().ffill()
    out_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_path, index=False)
    return df

# ---------------- 피처 선택(화이트리스트) ----------------
DATE_FEATS = ["dow_sin","dow_cos","weekofyear","dayofyear","month_sin","month_cos"]

def select_future_aware_features(df: pd.DataFrame, target: str) -> list:
    """
    미래 시점에도 스스로 생성 가능한 피처만 사용:
    - TARGET의 lag/rolling/diff/pct
    - 날짜 기반 주기 피처 (DATE_FEATS)
    """
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    allowed = []
    for c in num_cols:
        if c in DATE_FEATS:
            allowed.append(c)
        elif c.startswith(f"{target}_"):
            allowed.append(c)
    # 타깃과 y_next는 제외
    allowed = [c for c in allowed if c != target and c != "y_next"]
    # 존재하는 컬럼만
    allowed = [c for c in allowed if c in df.columns]
    return sorted(list(dict.fromkeys(allowed)))

# ---------------- 데이터셋/스플릿 ----------------
def time_split(df: pd.DataFrame, test_days: int):
    df = df.sort_values("date").reset_index(drop=True)
    cutoff = df["date"].max() - pd.Timedelta(days=test_days-1)
    train = df[df["date"] < cutoff].copy()
    test  = df[df["date"] >= cutoff].copy()
    return train, test

def build_xy(df: pd.DataFrame, target: str, feat_list: list):
    df = df.sort_values("date")
    df["y_next"] = df[target].shift(-1)  # 1-step ahead
    df = df.dropna(subset=["y_next"])
    X = df[feat_list].values.astype(np.float32)
    y = df["y_next"].values.astype(np.float32)
    idx = df.index  # 라벨 인덱스
    return X, y, idx

# ---------------- 평가 ----------------
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
def reg_metrics(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mape = float(np.mean(np.abs((y_true - y_pred)/np.maximum(1e-9, np.abs(y_true))))*100.0)
    r2 = r2_score(y_true, y_pred)
    return {"MAE": float(mae), "RMSE": float(rmse), "MAPE": float(mape), "R2": float(r2)}

# ---------------- 동적 재귀 예측 ----------------
def _roll_stats(seq, w):
    arr = np.array(seq[-w:], dtype=float)
    return float(np.mean(arr)), float(np.std(arr, ddof=0))

def _date_feats(next_date):
    dow = next_date.dayofweek
    weekofyear = int(next_date.isocalendar().week)
    dayofyear  = int(next_date.timetuple().tm_yday)
    month = int(next_date.month)
    out = {
        "dow_sin": np.sin(2*np.pi*dow/7.0),
        "dow_cos": np.cos(2*np.pi*dow/7.0),
        "weekofyear": weekofyear,
        "dayofyear": dayofyear,
        "month_sin": np.sin(2*np.pi*month/12.0),
        "month_cos": np.cos(2*np.pi*month/12.0),
    }
    return out

def recursive_forecast_dynamic(df_feat: pd.DataFrame, target: str, feat_list: list, model, horizon: int) -> pd.DataFrame:
    """
    마지막 관측까지의 target 히스토리로부터 미래 H일을 동적으로 생성.
    - 매 스텝 예측값을 래그/롤링/증감/pct 계산에 주입
    - 날짜 주기 피처는 next_date로 갱신
    """
    df = df_feat.sort_values("date").copy()
    last_date = df["date"].max()
    # 타깃 히스토리
    hist = df[target].tolist()
    preds, dates = [], []

    max_lag = max(LOOKBACKS) if LOOKBACKS else 1

    for i in range(1, horizon+1):
        nd = last_date + pd.Timedelta(days=i)
        # 필요한 피처 벡터 구성
        feat_row = {}

        # 날짜 피처
        feat_row.update(_date_feats(nd))

        # 래그들
        for l in LOOKBACKS:
            feat_row[f"{target}_lag{l}"] = float(hist[-l]) if len(hist) >= l else float(hist[-1])

        # 롤링 통계
        for w in ROLLS:
            m, s = _roll_stats(hist, min(len(hist), w))
            feat_row[f"{target}_rollmean{w}"] = m
            feat_row[f"{target}_rollstd{w}"] = s

        # 증감/비율
        if len(hist) >= 2:
            diff1 = hist[-1] - hist[-2]
            pct   = (hist[-1] - (hist[-2] if hist[-2] != 0 else 1e-9)) / max(abs(hist[-2]), 1e-9)
        else:
            diff1, pct = 0.0, 0.0
        feat_row[f"{target}_diff1"] = float(diff1)
        feat_row[f"{target}_pct"]   = float(pct)

        # 모델이 실제로 쓰는 컬럼 순서에 맞춰 배열화
        x = np.array([[feat_row.get(c, 0.0) for c in feat_list]], dtype=np.float32)
        yhat = float(model.predict(x)[0])

        # 누적
        preds.append(yhat)
        dates.append(nd)
        hist.append(yhat)  # 예측값을 히스토리에 추가(다음 스텝에 사용)

    return pd.DataFrame({"date": dates, "yhat": preds})

# ---------------- Permutation Importance (경량화) ----------------
def permutation_importance_light(model, X, y, feats, n_repeats=3, max_rows=500):
    try:
        from sklearn.inspection import permutation_importance as sk_perm
        if len(X) > max_rows:
            idx = np.random.RandomState(42).choice(len(X), size=max_rows, replace=False)
            Xs, ys = X[idx], y[idx]
        else:
            Xs, ys = X, y
        r = sk_perm(model, Xs, ys, n_repeats=n_repeats, random_state=42, n_jobs=-1)
        imp = pd.DataFrame({"feature": feats, "importance": r.importances_mean}).sort_values("importance", ascending=False)
        return imp
    except Exception:
        return None

# ---------------- 안전한 모델 로깅 ----------------
def safe_log_sklearn_model(model, name: str, signature=None, input_example=None, pip_reqs_min=True):
    if mlflow_sklearn is None:
        return
    kwargs = {}
    if signature is not None: kwargs["signature"] = signature
    if input_example is not None: kwargs["input_example"] = input_example
    if pip_reqs_min:
        kwargs["pip_requirements"] = ["mlflow", "scikit-learn"]
    try:
        mlflow_sklearn.log_model(sk_model=model, name=name, **kwargs)
    except TypeError:
        mlflow_sklearn.log_model(model, artifact_path=name, **kwargs)

def safe_log_xgb_model(model, name: str, signature=None, input_example=None, pip_reqs_min=True):
    try:
        import mlflow.xgboost as mlflow_xgb
    except Exception:
        return
    kwargs = {}
    if signature is not None: kwargs["signature"] = signature
    if input_example is not None: kwargs["input_example"] = input_example
    if pip_reqs_min:
        kwargs["pip_requirements"] = ["mlflow", "xgboost"]
    try:
        mlflow_xgb.log_model(model, name=name, **kwargs)
    except TypeError:
        mlflow_xgb.log_model(model, artifact_path=name, **kwargs)

# ==================== main ====================
def main():
    assert PROC.exists(), f"Processed CSV not found: {PROC}"
    base_df = pd.read_csv(PROC)
    if "date" not in base_df.columns or TARGET not in base_df.columns:
        raise ValueError("Input must have 'date' and target.")

    with mlflow.start_run(run_name="experiment_run"):

        # ----- 0) EDA -----
        with mlflow.start_run(run_name="data_and_eda", nested=True):
            ds = mlflow.data.from_pandas(base_df, source=str(PROC), name="raw_processed")
            mlflow.log_input(ds, context="training")
            mlflow.log_metrics({
                "rows": len(base_df),
                "cols": base_df.shape[1],
                "missing_total": int(base_df.isnull().sum().sum())
            })
            try:
                dfp = base_df.copy()
                dfp["date"] = pd.to_datetime(dfp["date"], errors="coerce")
                dfp = dfp.dropna(subset=["date"]).sort_values("date")
                lineplot(dfp["date"], dfp[TARGET], f"{TARGET} over time", "eda_target.png")
                corr_heatmap(dfp, "eda_corr_heatmap.png", max_cols=30)
            except Exception:
                pass
            log_df_preview_md(base_df, "raw_preview", 20)

        # ----- 1) Feature Engineering -----
        with mlflow.start_run(run_name="feature_engineering", nested=True):
            df_feat = feature_engineer(PROC, TARGET, FEAT)
            ds = mlflow.data.from_pandas(df_feat, source=str(FEAT), name="feature_engineered")
            mlflow.log_input(ds, context="training")
            mlflow.log_metrics({"fe_rows": len(df_feat), "fe_cols": df_feat.shape[1]})
            mlflow.log_artifact(str(FEAT))
            log_df_preview_md(df_feat, "fe_preview", 20)

        # ----- 2) 모델 학습/검증/미래예측 -----
        df_feat = pd.read_csv(FEAT)
        df_feat["date"] = pd.to_datetime(df_feat["date"], errors="coerce")
        # 미래 가능 피처만 선택
        feat_list = select_future_aware_features(df_feat, TARGET)
        with mlflow.start_run(run_name="features_used", nested=True):
            mlflow.log_text("\n".join(feat_list), "features_used.txt")

        train_df, test_df = time_split(df_feat, TEST_DAYS)
        train_ds = mlflow.data.from_pandas(train_df, name="train_dataset")
        test_ds  = mlflow.data.from_pandas(test_df,  name="test_dataset")

        # 공통 X,y
        X_train, y_train, idx_train = build_xy(train_df, TARGET, feat_list)
        X_test,  y_test,  idx_test  = build_xy(test_df,  TARGET, feat_list)

        # ---- 모델 1: Linear Regression
        from sklearn.linear_model import LinearRegression
        with mlflow.start_run(run_name="model_linear", nested=True):
            mlflow.log_params({"algo":"LinearRegression","horizon":HORIZON,"test_days":TEST_DAYS,
                               "lookbacks":",".join(map(str,LOOKBACKS)),"rolls":",".join(map(str,ROLLS)),
                               "feat_count":len(feat_list)})
            mlflow.log_input(train_ds, context="training")
            mlflow.log_input(test_ds,  context="testing")

            mdl = LinearRegression()
            mdl.fit(X_train, y_train)
            pred_test = mdl.predict(X_test)
            m = reg_metrics(y_test, pred_test)
            mlflow.log_metrics({f"test_{k}":v for k,v in m.items()})

            dt_test = test_df.loc[idx_test, "date"]
            dualplot(dt_test, y_test, pred_test, "Linear: actual vs pred (test)", "lin_test_pred.png")
            residplot(y_test, pred_test, "Linear: residuals (test)", "lin_test_resid.png")

            imp = permutation_importance_light(mdl, X_test, y_test, feat_list, n_repeats=3, max_rows=500)
            if imp is not None:
                mlflow.log_text(imp.head(30).to_markdown(index=False), "lin_perm_importance.md")

            # 동적 재귀 예측
            df_fore = recursive_forecast_dynamic(df_feat, TARGET, feat_list, mdl, HORIZON)
            path = OUTDIR / "linear_forecast.csv"
            df_fore.to_csv(path, index=False); mlflow.log_artifact(str(path))
            lineplot(df_fore["date"], df_fore["yhat"], "Linear forecast (future)", "lin_forecast.png")

            try:
                sig = mlflow.models.infer_signature(X_train[:5], mdl.predict(X_train[:5]))
                if mlflow_sklearn is not None:
                    mlflow_sklearn.log_model(sk_model=mdl, name="model_linear",
                                             signature=sig, input_example=X_train[:5],
                                             pip_requirements=["mlflow","scikit-learn"])
            except Exception:
                pass

        # ---- 모델 2: RandomForest
        from sklearn.ensemble import RandomForestRegressor
        with mlflow.start_run(run_name="model_rf", nested=True):
            params = {"algo":"RandomForestRegressor","n_estimators":400,"max_depth":12,"min_samples_leaf":2,
                      "n_jobs":-1,"random_state":42,"horizon":HORIZON,"test_days":TEST_DAYS,
                      "feat_count":len(feat_list)}
            mlflow.log_params(params)
            mlflow.log_input(train_ds, context="training")
            mlflow.log_input(test_ds,  context="testing")

            rf = RandomForestRegressor(
                n_estimators=params["n_estimators"],
                max_depth=params["max_depth"],
                min_samples_leaf=params["min_samples_leaf"],
                n_jobs=-1, random_state=42
            )
            rf.fit(X_train, y_train)
            pred_test = rf.predict(X_test)
            m = reg_metrics(y_test, pred_test)
            mlflow.log_metrics({f"test_{k}":v for k,v in m.items()})

            dt_test = test_df.loc[idx_test, "date"]
            dualplot(dt_test, y_test, pred_test, "RF: actual vs pred (test)", "rf_test_pred.png")
            residplot(y_test, pred_test, "RF: residuals (test)", "rf_test_resid.png")

            imp = permutation_importance_light(rf, X_test, y_test, feat_list, n_repeats=3, max_rows=500)
            if imp is not None:
                mlflow.log_text(imp.head(30).to_markdown(index=False), "rf_perm_importance.md")

            df_fore = recursive_forecast_dynamic(df_feat, TARGET, feat_list, rf, HORIZON)
            path = OUTDIR / "rf_forecast.csv"
            df_fore.to_csv(path, index=False); mlflow.log_artifact(str(path))
            lineplot(df_fore["date"], df_fore["yhat"], "RF forecast (future)", "rf_forecast.png")

            try:
                sig = mlflow.models.infer_signature(X_train[:5], rf.predict(X_train[:5]))
                if mlflow_sklearn is not None:
                    mlflow_sklearn.log_model(sk_model=rf, name="model_rf",
                                             signature=sig, input_example=X_train[:5],
                                             pip_requirements=["mlflow","scikit-learn"])
            except Exception:
                pass

        # ---- (선택) 모델 3: XGBoost (설치 시)
        try:
            from xgboost import XGBRegressor
            with mlflow.start_run(run_name="model_xgb", nested=True):
                params = {"algo":"XGBRegressor","n_estimators":600,"max_depth":6,"learning_rate":0.05,
                          "subsample":0.9,"colsample_bytree":0.9,"random_state":42,
                          "reg_lambda":1.0,"horizon":HORIZON,"test_days":TEST_DAYS,
                          "feat_count":len(feat_list)}
                mlflow.log_params(params)
                mlflow.log_input(train_ds, "training"); mlflow.log_input(test_ds, "testing")

                xgb = XGBRegressor(
                    n_estimators=params["n_estimators"], max_depth=params["max_depth"],
                    learning_rate=params["learning_rate"], subsample=params["subsample"],
                    colsample_bytree=params["colsample_bytree"], random_state=42, n_jobs=-1,
                    reg_lambda=params["reg_lambda"], tree_method="hist"
                )
                xgb.fit(X_train, y_train)
                pred_test = xgb.predict(X_test)
                m = reg_metrics(y_test, pred_test)
                mlflow.log_metrics({f"test_{k}":v for k,v in m.items()})

                dt_test = test_df.loc[idx_test, "date"]
                dualplot(dt_test, y_test, pred_test, "XGB: actual vs pred (test)", "xgb_test_pred.png")
                residplot(y_test, pred_test, "XGB: residuals (test)", "xgb_test_resid.png")

                df_fore = recursive_forecast_dynamic(df_feat, TARGET, feat_list, xgb, HORIZON)
                path = OUTDIR / "xgb_forecast.csv"
                df_fore.to_csv(path, index=False); mlflow.log_artifact(str(path))
                lineplot(df_fore["date"], df_fore["yhat"], "XGB forecast (future)", "xgb_forecast.png")

                try:
                    import mlflow.xgboost as mlflow_xgb
                    sig = mlflow.models.infer_signature(X_train[:5], xgb.predict(X_train[:5]))
                    mlflow_xgb.log_model(xgb, name="model_xgb",
                                         signature=sig, input_example=X_train[:5],
                                         pip_requirements=["mlflow","xgboost"])
                except Exception:
                    pass
        except Exception:
            pass  # xgboost 미설치면 자동 스킵

if __name__ == "__main__":
    main()
