# 열처리 공정 불량률 최소화를 위한 공정별 최적 온도 범위 조합 도출 모듈

## 1. train.py — XGBoost 학습 로직
- **데이터 준비**: `load_dataset`으로 분할을 불러오고 `TargetScaler`로 불량률을 10⁵배 확장하여 수치 안정성 확보
- **모델 구성**: `DEFAULT_CONFIG`의 하이퍼파라미터로 `XGBRegressor` 생성, 검증 셋을 `eval_set`으로 등록해 학습 진행 상황을 모니터링
- **성능 평가**: 학습/검증(+선택적 테스트) 셋에 대해 MAE·RMSE·R²를 계산하여 JSON으로 저장
- **설명 가능성 확보**: XGBoost Booster 혹은 `feature_importances_`를 이용해 gain 기반 피처 중요도를 CSV로 출력
- **재현성 관리**: 실행마다 `run_id`를 생성하고 최신 run 마커를 업데이트해 다른 모듈이 동일 run 결과 참조


In [None]:
# train.py 전체 코드
from __future__ import annotations

import json
from pathlib import Path
from typing import Dict, Optional

import joblib
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from xgboost import XGBRegressor

from common import (
    DatasetSplit,
    TargetScaler,
    ensure_logger,
    ensure_result_directories,
    generate_run_id,
    load_config,
    load_dataset,
    write_latest_run_marker,
)


def _scale_split(split: DatasetSplit, scaler: TargetScaler) -> DatasetSplit:
    """타깃 스케일링: 불량률이 1e-4 수준이므로 1e5 배 확대 후 학습."""

    if not scaler.enabled:
        return split
    return DatasetSplit(
        features=split.features,
        target=scaler.scale_series(split.target),
        lots=split.lots,
    )


def _evaluate(model: XGBRegressor, split: DatasetSplit, scaler: TargetScaler) -> Dict[str, float]:
    """예측값을 역스케일하여 MAE/RMSE/R²를 계산."""

    predictions = model.predict(split.features)
    predictions = scaler.inverse_values(predictions)
    actual = split.target.to_numpy(dtype=float)
    mae = float(mean_absolute_error(actual, predictions))
    rmse = float(np.sqrt(mean_squared_error(actual, predictions)))
    r2 = float(r2_score(actual, predictions))
    return {"mae": mae, "rmse": rmse, "r2": r2}


def train(
    *,
    config_path: Optional[Path] = None,
    run_id: Optional[str] = None,
    model_name: str = "xgb_baseline",
    save_feature_importances: bool = True,
) -> Dict[str, str]:
    """XGBoost 회귀 모델 학습 파이프라인."""

    logger = ensure_logger()
    config = load_config(config_path)

    base_result_dir = Path(str(config["paths"]["result_root"]))
    run_id = run_id or generate_run_id()
    run_paths = ensure_result_directories(base_result_dir, config, run_id)

    # 1. 데이터 분할 로드
    train_split = load_dataset("train", config)
    val_split = load_dataset("validation", config)
    try:
        test_split = load_dataset("test", config)
    except FileNotFoundError:
        test_split = None

    training_cfg = config["training"]
    scaler = TargetScaler.from_config(training_cfg.get("target_scaler", {}))

    # 2. 타깃 스케일링으로 XGBoost의 손실 스케일을 안정화
    scaled_train = _scale_split(train_split, scaler)
    scaled_val = _scale_split(val_split, scaler)

    # 3. 하이퍼파라미터 로딩 및 기본값 보강
    params = dict(training_cfg.get("model_params", {}))
    params.setdefault("random_state", training_cfg.get("random_seed", 42))
    params.setdefault("n_jobs", training_cfg.get("n_jobs", -1))
    eval_metric = training_cfg.get("eval_metric")
    if eval_metric and "eval_metric" not in params:
        params["eval_metric"] = eval_metric  # 예: RMSE

    model = XGBRegressor(**params)

    # 4. 검증 셋을 eval_set으로 지정 → early stopping/평가에 활용 가능
    eval_set = []
    fit_kwargs: Dict[str, object] = {}
    if val_split.features.shape[0] > 0:
        eval_set = [
            (scaled_train.features, scaled_train.target.to_numpy(dtype=float)),
            (scaled_val.features, scaled_val.target.to_numpy(dtype=float)),
        ]
        fit_kwargs["eval_set"] = eval_set

    # 5. 모델 학습 (조기 종료 옵션을 config에서 제어 가능)
    model.fit(
        scaled_train.features,
        scaled_train.target.to_numpy(dtype=float),
        **fit_kwargs,
    )

    # 6. 원본 스케일에서 성능 지표 산출
    metrics: Dict[str, Dict[str, float]] = {
        "train": _evaluate(model, train_split, scaler),
        "validation": _evaluate(model, val_split, scaler),
    }
    if test_split is not None:
        metrics["test"] = _evaluate(model, test_split, scaler)

    metrics_path = run_paths["reports"] / f"{model_name}_metrics.json"
    metrics_path.write_text(json.dumps(metrics, indent=2, ensure_ascii=False))

    # 7. 모델 저장 (Optuna/SHAP 등 후속 모듈에서 재사용)
    model_path = run_paths["models"] / f"{model_name}.joblib"
    joblib.dump(model, model_path)

    # 8. Gain 기반 피처 중요도를 CSV로 내보내 설명 가능성 확보
    feature_importance_path: Optional[Path] = None
    if save_feature_importances:
        try:
            booster = model.get_booster()
            scores = booster.get_score(importance_type="gain")
            records = (
                pd.DataFrame(
                    sorted(scores.items(), key=lambda item: item[1], reverse=True),
                    columns=["feature", "importance"],
                )
                if scores
                else pd.DataFrame(columns=["feature", "importance"])
            )
        except Exception:
            try:
                importances = getattr(model, "feature_importances_", None)
                if importances is None:
                    raise AttributeError
                records = pd.DataFrame(
                    {
                        "feature": train_split.features.columns,
                        "importance": np.asarray(importances, dtype=float),
                    }
                )
            except Exception:
                records = pd.DataFrame(columns=["feature", "importance"])

        feature_importance_path = run_paths["reports"] / f"{model_name}_feature_importances.csv"
        records.to_csv(feature_importance_path, index=False)

    # 9. run 정보 기록 → 다른 모듈이 최신 결과에 접근 가능
    write_latest_run_marker(base_result_dir, config, run_id)
    logger.info("Training completed. Run %s stored at %s", run_id, run_paths["run_dir"])

    result: Dict[str, str] = {
        "run_id": run_id,
        "model_path": str(model_path),
        "metrics_path": str(metrics_path),
    }
    if feature_importance_path is not None:
        result["feature_importances_path"] = str(feature_importance_path)
    return result


if __name__ == "__main__":
    train()



## 2. evaluate.py — 모델 성능 검증
- **예측값 평가**: 예측값에 대한 MAE/RMSE/R² 평가
- **검증 결과 저장**: LOT별 실제/예측값 비교표 및 그래프 저장
- **선택적 SHAP 평가 및 저장**: 선택적으로 LOT별 상위 SHAP 피처를 CSV로 저장


In [None]:
# evaluate.py 전체 코드
from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Dict, Optional

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import shap

from common import (
    TargetScaler,
    configure_matplotlib,
    ensure_logger,
    ensure_result_directories,
    load_config,
    load_dataset,
    read_latest_run_marker,
)


def _compute_metrics(df: pd.DataFrame) -> Dict[str, float]:
    """LOT별 예측 결과를 바탕으로 MAE/RMSE/R² 계산."""

    mae = float(df["absolute_error"].mean())
    rmse = float(np.sqrt(df["squared_error"].mean()))
    actual = df["actual_defect_rate"].values
    predicted = df["predicted_defect_rate"].values
    ss_res = float(np.sum((actual - predicted) ** 2))
    ss_tot = float(np.sum((actual - actual.mean()) ** 2))
    r2 = 1.0 - ss_res / ss_tot if ss_tot != 0 else float("nan")
    return {"mae": mae, "rmse": rmse, "r2": r2}


def _save_comparison_plot(df: pd.DataFrame, destination: Path) -> Path:
    """실제/예측 불량률을 LOT 순서대로 시각화."""

    destination.parent.mkdir(parents=True, exist_ok=True)
    sorted_df = df.sort_values("actual_defect_rate")
    indices = np.arange(len(sorted_df))
    width = 0.35

    plt.figure(figsize=(12, 6))
    plt.bar(indices - width / 2, sorted_df["actual_defect_rate"], width, label="Actual")
    plt.bar(indices + width / 2, sorted_df["predicted_defect_rate"], width, label="Predicted")
    plt.xticks(indices, sorted_df["LOT_NO"], rotation=60, ha="right")
    plt.ylabel("Defect rate")
    plt.title("LOT-level actual vs predicted defect rate")
    plt.legend()
    plt.tight_layout()
    plt.savefig(destination, bbox_inches="tight", dpi=300)
    plt.close()
    return destination


def _save_lot_shap_summary(
    model,
    scaler: TargetScaler,
    dataset,
    destination: Path,
    top_k: int = 5,
) -> Optional[Path]:
    """LOT별 평균 |SHAP| Top-k 피처를 CSV로 저장."""

    explainer = shap.TreeExplainer(model, feature_perturbation="tree_path_dependent")
    shap_values = explainer(dataset.features).values
    shap_values = scaler.inverse_values(shap_values)
    shap_abs = np.abs(shap_values)
    shap_df = pd.DataFrame(shap_abs, columns=dataset.features.columns)
    shap_df["LOT_NO"] = dataset.lots.values

    grouped = shap_df.groupby("LOT_NO").mean()
    records = []
    for lot_id, row in grouped.iterrows():
        top_features = row.sort_values(ascending=False).head(top_k)
        for rank, (feature, value) in enumerate(top_features.items(), start=1):
            records.append(
                {
                    "LOT_NO": lot_id,
                    "rank": rank,
                    "feature": feature,
                    "mean_abs_shap": float(value),
                }
            )

    if not records:
        return None

    summary_df = pd.DataFrame(records)
    destination.parent.mkdir(parents=True, exist_ok=True)
    summary_df.to_csv(destination, index=False)
    return destination


def evaluate(
    *,
    config_path: Optional[Path] = None,
    run_id: Optional[str] = None,
    model_name: str = "xgb_baseline",
    split: str = "validation",
    compute_shap: bool = True,
) -> Dict[str, Optional[str]]:
    """LOT 단위 평가 및 리포트 작성."""

    ensure_logger()
    configure_matplotlib()
    config = load_config(config_path)
    base_result_dir = Path(str(config["paths"]["result_root"]))

    if run_id is None:
        run_id = read_latest_run_marker(base_result_dir, config)
        if run_id is None:
            raise ValueError("No recorded run. Train the model before running evaluation.")

    run_paths = ensure_result_directories(base_result_dir, config, run_id)
    model_path = run_paths["models"] / f"{model_name}.joblib"
    if not model_path.exists():
        raise FileNotFoundError(f"Model not found at {model_path}")

    model = joblib.load(model_path)
    dataset = load_dataset(split, config)
    scaler = TargetScaler.from_config(config["training"].get("target_scaler", {}))

    predictions = scaler.inverse_values(model.predict(dataset.features))
    report_df = pd.DataFrame(
        {
            "LOT_NO": dataset.lots.values,
            "actual_defect_rate": dataset.target.values,
            "predicted_defect_rate": predictions,
        }
    )
    report_df["absolute_error"] = np.abs(
        report_df["actual_defect_rate"] - report_df["predicted_defect_rate"]
    )
    report_df["squared_error"] = (
        report_df["actual_defect_rate"] - report_df["predicted_defect_rate"]
    ) ** 2

    metrics = _compute_metrics(report_df)

    reports_dir = run_paths["reports"]
    plots_dir = run_paths["plots"]

    report_path = reports_dir / f"{model_name}_{split}_lot_evaluation.csv"
    metrics_path = reports_dir / f"{model_name}_{split}_lot_metrics.json"
    plot_path = plots_dir / f"{model_name}_{split}_lot_comparison.png"

    report_path.parent.mkdir(parents=True, exist_ok=True)
    report_df.to_csv(report_path, index=False)
    metrics_path.write_text(json.dumps(metrics, indent=2, ensure_ascii=False))
    _save_comparison_plot(report_df, plot_path)

    shap_summary_path: Optional[Path] = None
    if compute_shap:
        shap_summary_path = _save_lot_shap_summary(
            model,
            scaler,
            dataset,
            reports_dir / f"{model_name}_{split}_lot_shap_summary.csv",
        )

    logging.info("Evaluation completed for run %s", run_id)

    result: Dict[str, Optional[str]] = {
        "run_id": run_id,
        "report_path": str(report_path),
        "metrics_path": str(metrics_path),
        "plot_path": str(plot_path),
    }
    if shap_summary_path:
        result["shap_summary_path"] = str(shap_summary_path)
    return result


if __name__ == "__main__":
    evaluate()



## 3. analyze.py — SHAP 기반 심층 분석 상세 설명
- **샘플링 전략**: `max_samples` 설정으로 분석 비용을 제어하며, LOT 레이블을 유지한 상태로 샘플링
- **SHAP 계산**: `TreeExplainer`로 절대 SHAP·상호작용 값을 계산하고, 스케일러로 원 단위로 복원
- **전역/지역 시각화**: 요약 플롯, 의존성 플롯, LOT별 바 차트, 상호작용 히트맵을 생성해 영향도를 다면적으로 확인
- **표면 분석**: 상위 2개 피처를 활용해 예측 표면을 생성, 온도 조합에 따른 불량률 변화를 3D로 탐색


In [None]:
# analyze.py 전체 코드
from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import seaborn as sns
import shap

from common import (
    DatasetSplit,
    TargetScaler,
    compute_feature_ranges,
    configure_matplotlib,
    ensure_logger,
    ensure_result_directories,
    load_config,
    load_dataset,
    mean_absolute_shap,
    read_latest_run_marker,
)


def _sample_dataset(split: DatasetSplit, max_samples: Optional[int], random_state: int) -> DatasetSplit:
    """SHAP 비용을 줄이기 위해 LOT 라벨을 유지한 채 일부 행만 추출."""

    if max_samples is None or len(split.features) <= max_samples:
        return split
    sampled_features = split.features.sample(n=max_samples, random_state=random_state)
    sampled_target = split.target.loc[sampled_features.index]
    sampled_lots = split.lots.loc[sampled_features.index]
    return DatasetSplit(
        features=sampled_features.reset_index(drop=True),
        target=sampled_target.reset_index(drop=True),
        lots=sampled_lots.reset_index(drop=True),
    )


def _select_top_features(columns: Sequence[str], shap_values: np.ndarray, top_k: int) -> List[str]:
    """평균 절대 SHAP 순으로 상위 피처 추출."""

    scores = mean_absolute_shap(shap_values)
    order = np.argsort(scores)[::-1]
    limit = min(top_k, len(columns))
    return [columns[idx] for idx in order[:limit]]


def _save_summary_plot(
    explanation: shap.Explanation,
    shap_values: np.ndarray,
    destination: Path,
    top_k: int,
) -> Path:
    destination.parent.mkdir(parents=True, exist_ok=True)
    shap.summary_plot(shap_values, explanation.data, show=False, max_display=top_k)
    plt.tight_layout()
    plt.savefig(destination, bbox_inches="tight", dpi=300)
    plt.close()
    return destination


def _save_summary_table(columns: Sequence[str], shap_values: np.ndarray, destination: Path) -> Path:
    """전역 중요도를 CSV로 남겨 수치 비교 가능."""

    destination.parent.mkdir(parents=True, exist_ok=True)
    mean_abs = mean_absolute_shap(shap_values)
    df = pd.DataFrame({"feature": columns, "mean_abs_shap": mean_abs})
    df.sort_values("mean_abs_shap", ascending=False, inplace=True)
    df.to_csv(destination, index=False)
    return destination


def _save_dependence_plots(
    shap_values: np.ndarray,
    feature_matrix: pd.DataFrame,
    features: Sequence[str],
    destination_dir: Path,
) -> List[Path]:
    """단일 피처와 목표 간 관계를 시각화."""

    destination_dir.mkdir(parents=True, exist_ok=True)
    paths: List[Path] = []
    for feature in features:
        shap.dependence_plot(
            feature,
            shap_values,
            feature_matrix,
            show=False,
            feature_names=list(feature_matrix.columns),
        )
        plt.tight_layout()
        path = destination_dir / f"dependence_{feature}.png"
        plt.savefig(path, bbox_inches="tight", dpi=300)
        plt.close()
        paths.append(path)
    return paths


def _save_interaction_heatmap(
    feature_names: Sequence[str],
    interaction_values: np.ndarray,
    destination: Path,
    top_k: Optional[int],
) -> Optional[Path]:
    """SHAP 상호작용을 평균 절대값으로 요약해 히트맵으로 표현."""

    if interaction_values is None:
        return None

    destination.parent.mkdir(parents=True, exist_ok=True)

    strength = np.mean(np.abs(interaction_values), axis=0)
    np.fill_diagonal(strength, 0.0)
    interaction_df = pd.DataFrame(strength, index=feature_names, columns=feature_names)

    if top_k is not None and top_k < len(feature_names):
        totals = interaction_df.abs().sum(axis=0).sort_values(ascending=False)
        selected = totals.head(top_k).index
        interaction_df = interaction_df.loc[selected, selected]

    plt.figure(figsize=(12, 10))
    sns.heatmap(interaction_df, cmap="coolwarm", center=0.0)
    plt.title("Mean |SHAP interaction|")
    plt.tight_layout()
    plt.savefig(destination, bbox_inches="tight", dpi=300)
    plt.close()
    return destination


def _save_lot_barplots(
    shap_values: np.ndarray,
    columns: Sequence[str],
    lots: Sequence[str],
    destination: Path,
    top_k_features: int,
    top_k_lots: int,
) -> Optional[Path]:
    """LOT별 평균 절대 SHAP 상위 피처를 막대로 표현."""

    destination.parent.mkdir(parents=True, exist_ok=True)
    shap_df = pd.DataFrame(np.abs(shap_values), columns=columns)
    shap_df["LOT_NO"] = list(lots)
    lot_mean = shap_df.groupby("LOT_NO").mean()
    if lot_mean.empty:
        return None

    lot_scores = lot_mean.sum(axis=1).sort_values(ascending=False)
    selected_lots = lot_scores.head(top_k_lots).index
    if len(selected_lots) == 0:
        return None

    rows = len(selected_lots)
    fig, axes = plt.subplots(rows, 1, figsize=(12, 4 * rows))
    if rows == 1:
        axes = [axes]
    for ax, lot in zip(axes, selected_lots):
        values = lot_mean.loc[lot].sort_values(ascending=False).head(top_k_features)
        ax.barh(list(values.index[::-1]), list(values.values[::-1]))
        ax.set_title(f"LOT {lot} | Top SHAP contributions")
        ax.set_xlabel("Mean |SHAP|")
    plt.tight_layout()
    plt.savefig(destination, bbox_inches="tight", dpi=300)
    plt.close()
    return destination


def _save_surface_plot(
    model,
    scaler: TargetScaler,
    columns: Sequence[str],
    feature_pair: Tuple[str, str],
    feature_ranges: Dict[str, Tuple[float, float]],
    baseline_row: pd.Series,
    destination: Path,
) -> Optional[Path]:
    """상위 2개 피처의 조합에 따른 예측 불량률을 3D 표면으로 저장."""

    a, b = feature_pair
    if a not in feature_ranges or b not in feature_ranges:
        return None

    destination.parent.mkdir(parents=True, exist_ok=True)
    range_a = feature_ranges[a]
    range_b = feature_ranges[b]
    grid_a = np.linspace(range_a[0], range_a[1], 40)
    grid_b = np.linspace(range_b[0], range_b[1], 40)
    mesh_a, mesh_b = np.meshgrid(grid_a, grid_b)

    repeated = pd.DataFrame(
        np.repeat(baseline_row.to_frame().T.values, mesh_a.size, axis=0),
        columns=columns,
    )
    repeated[a] = mesh_a.ravel()
    repeated[b] = mesh_b.ravel()

    predictions = model.predict(repeated)
    surface = scaler.inverse_values(predictions).reshape(mesh_a.shape)

    figure = go.Figure(
        data=[
            go.Surface(x=grid_a, y=grid_b, z=surface, colorscale="Viridis", showscale=True)
        ]
    )
    figure.update_layout(
        title=f"Predicted defect rate surface ({a} vs {b})",
        scene=dict(xaxis_title=a, yaxis_title=b, zaxis_title="Predicted defect rate"),
    )
    figure.write_html(destination)
    return destination


def analyze(
    *,
    config_path: Optional[Path] = None,
    run_id: Optional[str] = None,
    model_name: str = "xgb_baseline",
    split: str = "train",
) -> Dict[str, Optional[str]]:
    """SHAP 기반 분석 artefact를 생성하고 경로를 반환."""

    ensure_logger()
    configure_matplotlib()
    config = load_config(config_path)

    base_result_dir = Path(str(config["paths"]["result_root"]))
    if run_id is None:
        run_id = read_latest_run_marker(base_result_dir, config)
        if run_id is None:
            raise ValueError("No recorded run. Execute training first or provide run_id explicitly.")

    run_paths = ensure_result_directories(base_result_dir, config, run_id)
    model_path = run_paths["models"] / f"{model_name}.joblib"
    if not model_path.exists():
        raise FileNotFoundError(f"Model not found at {model_path}")

    dataset = load_dataset(split, config)
    shap_cfg = config["shap"]
    sampled_dataset = _sample_dataset(
        dataset,
        shap_cfg.get("max_samples"),
        random_state=config["training"].get("random_seed", 42),
    )

    scaler = TargetScaler.from_config(config["training"].get("target_scaler", {}))
    model = joblib.load(model_path)

    # 1. 단일/상호작용 SHAP 값을 계산하고 실제 불량률 단위로 복원
    explainer = shap.TreeExplainer(model, feature_perturbation="tree_path_dependent")
    explanation = explainer(sampled_dataset.features)
    shap_values = scaler.inverse_values(explanation.values)
    interaction_values = None
    if shap_cfg.get("interaction_top_k", 0) > 0:
        raw_interactions = explainer.shap_interaction_values(sampled_dataset.features)
        interaction_values = scaler.inverse_values(raw_interactions)

    columns = list(sampled_dataset.features.columns)
    top_features = _select_top_features(
        columns,
        shap_values,
        top_k=shap_cfg.get("top_k_features", 15),
    )

    # 2. 전역 요약 그래프/표 저장
    summary_plot = _save_summary_plot(
        explanation,
        shap_values,
        run_paths["shap"] / f"{model_name}_summary.png",
        top_k=shap_cfg.get("top_k_features", 15),
    )

    summary_table = _save_summary_table(
        columns,
        shap_values,
        run_paths["reports"] / f"{model_name}_shap_summary.csv",
    )

    # 3. 관심 피처에 대한 의존성 플롯 및 LOT별 막대 플롯 생성
    dependence_features = top_features[: shap_cfg.get("dependence_max_features", 5)]
    dependence_paths = _save_dependence_plots(
        shap_values,
        sampled_dataset.features,
        dependence_features,
        run_paths["shap"] / "dependence",
    )

    heatmap_path = _save_interaction_heatmap(
        columns,
        interaction_values,
        run_paths["shap"] / f"{model_name}_interaction_heatmap.png",
        top_k=shap_cfg.get("interaction_top_k"),
    )

    lot_barplot_path = _save_lot_barplots(
        shap_values,
        columns,
        sampled_dataset.lots,
        run_paths["shap"] / f"{model_name}_lot_shap.png",
        top_k_features=shap_cfg.get("lot_top_features", 10),
        top_k_lots=shap_cfg.get("lot_top_lots", 5),
    )

    # 4. 상위 2개 피처의 영향도를 3D 표면으로 시각화
    surface_path: Optional[Path] = None
    if len(top_features) >= 2:
        feature_ranges = compute_feature_ranges(dataset, top_features[:2])
        baseline_row = dataset.features.median()
        surface_path = _save_surface_plot(
            model,
            scaler,
            columns,
            (top_features[0], top_features[1]),
            feature_ranges,
            baseline_row,
            run_paths["shap"] / f"{model_name}_surface.html",
        )

    logging.info("SHAP analysis completed for run %s", run_id)

    artefacts: Dict[str, Optional[str]] = {
        "run_id": run_id,
        "summary_plot": str(summary_plot),
        "summary_table": str(summary_table),
    }
    if dependence_paths:
        artefacts["dependence_plots"] = json.dumps([str(path) for path in dependence_paths], ensure_ascii=False)
    if heatmap_path:
        artefacts["interaction_heatmap"] = str(heatmap_path)
    if lot_barplot_path:
        artefacts["lot_barplot"] = str(lot_barplot_path)
    if surface_path:
        artefacts["surface_plot"] = str(surface_path)
    return artefacts


if __name__ == "__main__":
    analyze()



## 4. optimize.py — Bayesian optimization 기반 불량률 최소화를 위한 설비별 최적 온도 평균, 표준 편차 도출
- **탐색 대상 선정**: LOT 필터링, `target_temperature_features` 교차 검증으로 실제 존재하는 컬럼만 대상으로 설정
- **탐색 공간 구성**: 학습 데이터에서 온도 피처의 최소/최대값을 계산해 안전한 탐색 범위를 확보
- **Optuna 목적 함수**: 각 시도에서 피처 값을 조정해 모델 재예측 후 역스케일하여 불량률을 최소화
- **성과 기록**: LOT별 baseline/최적 예측, 개선 폭, 추천 파라미터를 CSV로 저장해 현장 적용 근거로 활용


In [None]:
# optimize.py 전체 코드
from __future__ import annotations

import logging
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple

import joblib
import numpy as np
import optuna
import pandas as pd

from common import (
    TargetScaler,
    compute_feature_ranges,
    ensure_logger,
    ensure_result_directories,
    list_temperature_features,
    load_config,
    load_dataset,
    read_latest_run_marker,
)


def _select_lots(series: pd.Series, lot_ids: Optional[Sequence[str]], max_lots: Optional[int]) -> List[str]:
    """LOT 필터링: 지정 목록 우선, 없으면 등장 순서대로 중복 없이 선택."""

    if lot_ids:
        selected = [lot for lot in lot_ids if lot in set(series.values)]
    else:
        # 순서를 보존하면서 중복 제거 → 생산 순서를 유지한 채 탐색 가능
        seen = set()
        selected = []
        for value in series.values:
            if value in seen:
                continue
            seen.add(value)
            selected.append(value)
    if max_lots is not None:
        selected = selected[: int(max_lots)]
    return selected


def _optimize_single_lot(
    model,
    scaler: TargetScaler,
    base_row: pd.Series,
    feature_ranges: Dict[str, Tuple[float, float]],
    opt_config: Dict[str, object],
) -> Tuple[Dict[str, float], float]:
    """단일 LOT을 대상으로 TPE 기반 Optuna 탐색을 수행."""

    sampler = optuna.samplers.TPESampler(seed=opt_config.get("seed", 2024))
    study = optuna.create_study(direction="minimize", sampler=sampler)

    def objective(trial: optuna.trial.Trial) -> float:
        # 후보 행을 복사한 뒤 탐색 대상 피처만 업데이트
        candidate = base_row.to_frame().T.copy()
        for feature_name, bounds in feature_ranges.items():
            low, high = bounds
            value = trial.suggest_float(feature_name, low, high)
            candidate.at[0, feature_name] = value
        prediction = float(model.predict(candidate)[0])
        return scaler.inverse_scalar(prediction)  # 불량률 단위로 평가

    study.optimize(
        objective,
        n_trials=int(opt_config.get("n_trials", 60)),
        timeout=opt_config.get("timeout_seconds"),
        n_jobs=opt_config.get("n_jobs", 1),
        show_progress_bar=False,
    )
    best_params = {key: float(value) for key, value in study.best_params.items()}
    best_value = float(study.best_value)
    return best_params, best_value


def optimize(
    *,
    config_path: Optional[Path] = None,
    run_id: Optional[str] = None,
    model_name: str = "xgb_baseline",
    split: str = "validation",
    lot_ids: Optional[Sequence[str]] = None,
    max_lots: Optional[int] = None,
) -> Path:
    """Optuna 기반 온도 최적화 전체 파이프라인."""

    ensure_logger()
    config = load_config(config_path)
    base_result_dir = Path(str(config["paths"]["result_root"]))

    if run_id is None:
        run_id = read_latest_run_marker(base_result_dir, config)
        if run_id is None:
            raise ValueError("No recorded run. Train the model before running optimization.")

    run_paths = ensure_result_directories(base_result_dir, config, run_id)
    model_path = run_paths["models"] / f"{model_name}.joblib"
    if not model_path.exists():
        raise FileNotFoundError(f"Model not found at {model_path}")

    model = joblib.load(model_path)
    scaler = TargetScaler.from_config(config["training"].get("target_scaler", {}))

    dataset = load_dataset(split, config)
    train_split = load_dataset("train", config)

    temperature_features = list_temperature_features(dataset.features.columns, config)

    opt_config = config["optimization"]
    target_feature_list = opt_config.get("target_temperature_features")
    candidate_features: Sequence[str]
    if target_feature_list:
        candidate_features = [
            feature for feature in target_feature_list if feature in dataset.features.columns
        ]
    else:
        max_feature_setting = opt_config.get("max_temperature_features")
        if max_feature_setting is None:
            candidate_features = temperature_features
        else:
            candidate_features = temperature_features[: int(max_feature_setting)]
    if not candidate_features:
        raise ValueError("No temperature features available for optimization.")

    # 학습 데이터의 범위를 사용해 탐색 공간을 안전하게 제한
    feature_ranges = compute_feature_ranges(train_split, candidate_features)
    missing = [feature for feature in candidate_features if feature not in feature_ranges]
    if missing:
        raise ValueError(f"Missing feature ranges for: {missing}")

    if max_lots is None:
        max_lots_setting = opt_config.get("max_lots")
        if max_lots_setting is not None:
            max_lots = int(max_lots_setting)

    selected_lots = _select_lots(dataset.lots, lot_ids, max_lots)
    if not selected_lots:
        raise ValueError("No lots selected for optimization.")

    records: List[Dict[str, float]] = []
    for lot in selected_lots:
        mask = dataset.lots == lot
        if not mask.any():
            continue
        base_row = dataset.features.loc[mask].iloc[0].copy()
        baseline_prediction = float(model.predict(base_row.to_frame().T)[0])
        baseline_prediction = scaler.inverse_scalar(baseline_prediction)

        # LOT 단위 최적 탐색 수행
        best_params, best_value = _optimize_single_lot(
            model,
            scaler,
            base_row,
            feature_ranges,
            config["optimization"],
        )

        record: Dict[str, float] = {
            "LOT_NO": lot,
            "predicted_defect_rate": float(best_value),
            "baseline_prediction": float(baseline_prediction),
            "improvement": float(baseline_prediction - best_value),
        }
        record.update(best_params)
        records.append(record)
        logging.info(
            "Optimized lot %s: baseline=%.6f, optimized=%.6f",
            lot,
            baseline_prediction,
            best_value,
        )

    # LOT별 추천 값을 CSV로 저장해 후속 공정 검토에 활용
    output_path = run_paths["optimization"] / f"{model_name}_{split}_optimization.csv"
    result_df = pd.DataFrame(records)
    if not result_df.empty:
        result_df.sort_values("predicted_defect_rate", inplace=True)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    result_df.to_csv(output_path, index=False)

    return output_path


if __name__ == "__main__":
    optimize()

