# KSC: LLM-Driven **Composite** Test Generation (GPT‑4o, Python)
**Repo:** `temporalio/money-transfer-project-template-python`  
**LLM:** OpenAI GPT‑4o  
**Notebook 목적:** *분기 의무 + Def‑Use 체인 + 예외 경로*를 통합한 테스트를 자동 생성·실행·증분합니다.

# 3-0. 런타임 & 의존성 준비

In [None]:
!rm -rf /contents/*

In [1]:
#@title 3-0) 런타임 & 의존성 준비  (동일 커버리지 범위 설정 포함)
import os, sys, subprocess, pathlib, getpass, textwrap, shutil

# 1) 기본 상수/경로
REPO_URL = "https://github.com/temporalio/money-transfer-project-template-python.git"
PROJECT_NAME = REPO_URL.rstrip("/").split("/")[-1].replace(".git", "")
ROOT = pathlib.Path(".").resolve()
PROJ = ROOT / PROJECT_NAME

def sh(cmd: str, check: bool = True, cwd: pathlib.Path | None = None) -> None:
    print("> ", cmd)
    rc = subprocess.call(cmd, shell=True, cwd=str(cwd) if cwd else None)
    if check and rc != 0:
        raise RuntimeError(f"Command failed (rc={rc}): {cmd}")

print(f"Python {sys.version}")
print("ROOT:", ROOT)

# 2) 레포 클론/업데이트
if PROJ.exists():
    print(f"Repo exists at {PROJ}. Pulling latest…")
    sh(f"git -C {PROJ} fetch --all --prune")
    sh(f"git -C {PROJ} reset --hard origin/main")
else:
    sh(f"git clone --depth=1 {REPO_URL}")
print("✅ 레포 클론/업데이트 완료")

# 3) 의존성 설치
def pip_install(pkgs):
    cmd = [sys.executable, "-m", "pip", "install", "-U"] + pkgs
    print("> ", " ".join(cmd))
    subprocess.run(cmd, check=True)

pkgs_core = [
    "pytest", "pytest-asyncio", "pytest-mock", "pytest-cov",
    "coverage[toml]",
    "temporalio",
    "lxml",
    "asttokens", "libcst", "networkx",
    "rich", "pyyaml",
]
if sys.version_info < (3, 11):
    pkgs_core.append("tomli")

pkgs_llm = [
    "openai>=1.43.0",
    "httpx>=0.27.0",
    "backoff>=2.2.1",
    "tiktoken>=0.7.0",
    "aiolimiter>=1.1.0",
    "anyio>=4.4.0",
    "nest_asyncio>=1.6.0",
    "python-dotenv>=1.0.1",
    "tqdm>=4.66.5",
]

pip_install(["pip"])
pip_install(pkgs_core)
pip_install(pkgs_llm)
print("✅ 의존성 설치 완료")

# 4) .env (OPENAI_API_KEY 등)
try:
    from dotenv import load_dotenv
except Exception:
    pip_install(["python-dotenv>=1.0.1"])
    from dotenv import load_dotenv

openai_api_key = getpass.getpass("Enter OPENAI_API_KEY (필수): ").strip()
env_lines = [
    f"OPENAI_API_KEY={openai_api_key}",
    "OPENAI_BASE_URL=",
    "OPENAI_API_VERSION=",
    "OPENAI_ORG_ID=",
    "AZURE_OPENAI_ENDPOINT=",
    "AZURE_OPENAI_DEPLOYMENT=",
]
env_path = ROOT / ".env"
env_path.write_text("\n".join(env_lines) + "\n", encoding="utf-8")
load_dotenv(env_path)
os.environ["OPENAI_API_KEY"] = openai_api_key
print("✅ .env 저장 및 환경 주입 완료")

# 5) PYTHONPATH 설정 (src 레이아웃도 지원)
os.environ["PROJECT_PATH"] = str(PROJ)
py_paths = [str(PROJ)]
if (PROJ / "src").exists():
    py_paths.insert(0, str(PROJ / "src"))  # src 우선
prev_pp = os.environ.get("PYTHONPATH", "")
os.environ["PYTHONPATH"] = ":".join(py_paths + ([prev_pp] if prev_pp else []))
print("✅ PYTHONPATH =", os.environ["PYTHONPATH"])

# 6) 커버리지 source 자동 탐색 → .coveragerc 생성
def detect_sources(proj: pathlib.Path) -> list[str]:
    """
    최상위 패키지 후보를 자동 탐지:
    - src/ 가 있으면 src/ 하위의 1depth 패키지( __init__.py 존재 )를 사용
    - 없으면 프로젝트 루트의 1depth 패키지( __init__.py 존재 )를 사용
    - 없으면 .py가 다수인 디렉터리(테스트/생성 제외)를 대안으로 포함
    """
    candidates_root = []
    base = proj / "src" if (proj / "src").exists() else proj
    for child in sorted(base.iterdir()):
        if not child.is_dir():
            continue
        if child.name in {"tests", "generated_tests", ".venv", "venv", ".git", "run_artifacts", "htmlcov", "htmlcov_gen", "htmlcov_gen_pass"}:
            continue
        if (child / "__init__.py").exists():
            # coverage source 경로는 프로젝트 루트 기준으로 작성
            rel = child.relative_to(proj)
            candidates_root.append(str(rel))

    if candidates_root:
        return candidates_root

    # fallback: __init__.py는 없지만 .py 파일 많은 디렉터리 몇 개 포함
    fallback = []
    for child in sorted(base.iterdir()):
        if not child.is_dir():
            continue
        if child.name in {"tests", "generated_tests", ".venv", "venv", ".git", "run_artifacts", "htmlcov"}:
            continue
        py_count = len(list(child.glob("*.py")))
        if py_count >= 2:
            rel = child.relative_to(proj)
            fallback.append(str(rel))
    return fallback or ["."]

sources = detect_sources(PROJ)

omit_patterns = [
    "tests/*",
    "generated_tests/*",
    "run_artifacts/*",
    "*/site-packages/*",
    ".venv/*",
    "venv/*",
]

coveragerc_text = "[run]\nbranch = True\n"
# source 리스트를 줄바꿈으로 명시
coveragerc_text += "source = \n" + "\n".join(f"    {s}" for s in sources) + "\n\n"
coveragerc_text += "[report]\nshow_missing = True\nskip_covered = True\n"
coveragerc_text += "omit = \n" + "\n".join(f"    {p}" for p in omit_patterns) + "\n"

(PROJ / ".coveragerc").write_text(coveragerc_text, encoding="utf-8")
os.environ["COVERAGE_RCFILE"] = str(PROJ / ".coveragerc")  # 모든 단계에서 동일 설정 사용
print("✅ .coveragerc 생성/적용 완료")
print("  - source =", sources)
print("  - omit   =", omit_patterns)

# 7) 결과 디렉토리 준비
for d in ["run_artifacts", "htmlcov", "reports"]:
    (PROJ / d).mkdir(parents=True, exist_ok=True)
print("✅ 결과 디렉토리 준비 완료")


Python 3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]
ROOT: /content
>  git clone --depth=1 https://github.com/temporalio/money-transfer-project-template-python.git
✅ 레포 클론/업데이트 완료
>  /usr/bin/python3 -m pip install -U pip
>  /usr/bin/python3 -m pip install -U pytest pytest-asyncio pytest-mock pytest-cov coverage[toml] temporalio lxml asttokens libcst networkx rich pyyaml
>  /usr/bin/python3 -m pip install -U openai>=1.43.0 httpx>=0.27.0 backoff>=2.2.1 tiktoken>=0.7.0 aiolimiter>=1.1.0 anyio>=4.4.0 nest_asyncio>=1.6.0 python-dotenv>=1.0.1 tqdm>=4.66.5
✅ 의존성 설치 완료
Enter OPENAI_API_KEY (필수): ··········
✅ .env 저장 및 환경 주입 완료
✅ PYTHONPATH = /content/money-transfer-project-template-python:/env/python
✅ .coveragerc 생성/적용 완료
  - source = ['.']
  - omit   = ['tests/*', 'generated_tests/*', 'run_artifacts/*', '*/site-packages/*', '.venv/*', 'venv/*']
✅ 결과 디렉토리 준비 완료


In [2]:
#@title 3-1) 기준선 측정 – 기존 테스트 실행 및 커버리지 수집 (rcfile 고정 + 스코프 검증 · 견고화)
from pathlib import Path
import subprocess, sys, json, shutil, os, re
from lxml import etree

# ========= 0) 경로/유틸 =========
assert 'PROJ' in globals(), "3-0 단계에서 PROJ 변수가 설정되어 있어야 합니다."
PROJ = Path(PROJ).resolve()
ART_DIR  = PROJ / "run_artifacts" / "run1"
HTML_DIR = PROJ / "htmlcov"
ART_DIR.mkdir(parents=True, exist_ok=True)
HTML_DIR.mkdir(parents=True, exist_ok=True)

def sh(cmd: str, check: bool = True, cwd: Path | None = None) -> int:
    """간단 셸 실행 (반환코드만 사용), 실패시 메시지."""
    print("> ", cmd)
    rc = subprocess.call(cmd, shell=True, cwd=str(cwd or PROJ))
    if check and rc != 0:
        raise RuntimeError(f"Command failed (rc={rc}): {cmd}")
    return rc

def norm_path(fp: str) -> str:
    p = Path(fp)
    if not p.is_absolute():
        p = (PROJ / p).resolve()
    return str(p)

def in_project(abs_path: str) -> bool:
    try:
        return str(PROJ) in str(Path(abs_path).resolve())
    except Exception:
        return False

# ========= 1) .coveragerc 고정/생성 =========
RCFILE = PROJ / ".coveragerc"
if not RCFILE.exists():
    print("⚠️ .coveragerc가 없어 기본 템플릿을 생성합니다.")
    RCFILE.write_text(
        "[run]\n"
        "branch = True\n"
        f"data_file = {str((ART_DIR / '.coverage.baseline').as_posix())}\n"
        "\n"
        "[report]\n"
        "exclude_lines =\n"
        "    pragma: no cover\n"
        "\n"
        "[html]\n"
        f"directory = {str(HTML_DIR.as_posix())}\n"
        "\n"
        "[paths]\n"
        f"source =\n    {str(PROJ.as_posix())}\n"
        "\n"
        "[run:omit]\n"
        # 테스트/산출물/노트북 등 제외
        "omit =\n"
        "    */tests/*\n"
        "    tests/*\n"
        "    */generated_tests/*\n"
        "    */run_artifacts/*\n"
        "    */htmlcov/*\n"
        "    */.venv/*\n"
        "    */site-packages/*\n",
        encoding="utf-8"
    )
rc_opt = f" --rcfile {RCFILE}"

# ========= 2) 기준선 실행 =========
# 과거 데이터 정리
sh("coverage erase" + rc_opt, check=False)

# pytest 실행(실패해도 계속 진행해서 커버리지 산출)
rc_run = sh(f"{sys.executable} -m coverage run{rc_opt} -m pytest -q", check=False)

# 커버리지 산출물 생성 (항상 시도)
json_path = PROJ / "coverage_base.json"
xml_path  = PROJ / "coverage_base.xml"
sh(f"coverage json -o {json_path.name}" + rc_opt, check=False)
sh(f"coverage xml  -o {xml_path.name}"  + rc_opt, check=False)
sh("coverage html" + rc_opt, check=False)

# .coverage 파일 백업(있으면)
cov_data_file = ART_DIR / ".coverage.baseline"
if (ART_DIR / ".coverage.baseline").exists() is False:
    # RCFILE에서 data_file을 ART_DIR/.coverage.baseline으로 지정했으므로 이미 그 위치일 수 있음
    # 혹시 프로젝트 루트에 생성됐으면 복사
    root_cov = PROJ / ".coverage"
    if root_cov.exists():
        shutil.copy2(root_cov, cov_data_file)

# ========= 3) 산출물 보관 및 ‘No data’ 방어 =========
if not json_path.exists():
    # coverage json이 아예 없을 때, 빈 구조라도 생성
    print("⚠️ coverage_base.json이 생성되지 않아 빈 구조를 만듭니다.")
    json_path.write_text(json.dumps({"files": {}}, indent=2), encoding="utf-8")
if not xml_path.exists():
    print("⚠️ coverage_base.xml이 생성되지 않아 빈 구조를 만듭니다.")
    xml_path.write_text("<coverage></coverage>", encoding="utf-8")

# 보관
for src in [json_path, xml_path]:
    if src.exists():
        shutil.copy2(src, ART_DIR / src.name)

print("✅ 기존 테스트 실행 및 커버리지 수집 완료 (기준선)")
print(" - JSON :", ART_DIR / 'coverage_base.json')
print(" - XML  :", ART_DIR / 'coverage_base.xml')
print(" - HTML :", HTML_DIR / 'index.html')

# ========= 4) uncovered_map 생성 =========
data = {}
try:
    data = json.loads(json_path.read_text(encoding="utf-8"))
except Exception as e:
    print("⚠️ coverage_base.json 파싱 실패, 빈 구조로 진행:", e)
files = data.get("files", {}) or {}

uncovered_map: dict[str, list[int]] = {}
for fpath, finfo in files.items():
    # 파이썬만
    if not str(fpath).lower().endswith(".py"):
        continue
    abs_path = norm_path(fpath)
    # 프로젝트 외 파일 제외
    if not in_project(abs_path):
        continue
    # omit 스코프 재검증(테스트/산출물 폴더 제외)
    if any(seg in abs_path.replace("\\", "/") for seg in [
        "/tests/", "tests/", "/generated_tests/", "generated_tests/",
        "/run_artifacts/", "run_artifacts/", "/htmlcov/", "htmlcov/"
    ]):
        continue
    miss = finfo.get("missing_lines", []) or []
    if miss:
        try:
            uncovered_map[abs_path] = sorted(set(int(x) for x in miss))
        except Exception:
            # 커버리지 포맷이 예외적일 때 방어
            uncovered_map[abs_path] = sorted({int(x) for x in map(str, miss) if str(x).isdigit()})

(ART_DIR / "uncovered_map_base.json").write_text(
    json.dumps(uncovered_map, indent=2, ensure_ascii=False), encoding="utf-8"
)

print(f"총 파일 수: {len(files)} / 미커버 파일 수: {len(uncovered_map)}")
miss_total = sum(len(v) for v in uncovered_map.values())
print(f"미커버 라인 수 총합: {miss_total}")
print("✅ uncovered_map_base.json 생성 완료 →", ART_DIR / "uncovered_map_base.json")

# ========= 4-a) 스코프 검증(테스트/생성 테스트 끼임 탐지) =========
bad = [p for p in files.keys()
       if "generated_tests/" in p or "/tests/" in p or str(p).startswith("tests/")]
if bad:
    print("⚠️ baseline에 테스트/생성 테스트 파일이 포함되었습니다. .coveragerc의 source/omit을 확인하세요.")
    for b in bad[:20]:
        print(" -", b)

# ========= 5) observed_outcomes (브랜치 관측) =========
observed_outcomes: dict[str, dict[int, dict]] = {}
try:
    xml_root = etree.parse(str(xml_path)).getroot()
    for cls in xml_root.findall(".//class"):
        filename = cls.get("filename") or ""
        if not filename:
            continue
        abs_path = norm_path(filename)
        if not in_project(abs_path):
            continue
        # omit 재검증
        if any(seg in abs_path.replace("\\", "/") for seg in [
            "/tests/", "tests/", "/generated_tests/", "generated_tests/",
            "/run_artifacts/", "run_artifacts/", "/htmlcov/", "htmlcov/"
        ]):
            continue

        for line in cls.findall("./lines/line"):
            if line.get("branch") != "true":
                continue
            try:
                num = int(line.get("number"))
            except Exception:
                continue
            cond = line.get("condition-coverage")  # 예: "50% (1/2)"
            covered = total = 0
            if cond:
                m = re.search(r"\((\d+)\s*/\s*(\d+)\)", cond)
                if m:
                    covered, total = int(m.group(1)), int(m.group(2))
            if total == 0:
                continue
            observed_outcomes.setdefault(abs_path, {})[num] = {
                "covered": covered,
                "total": total,
                "ratio": round(covered / total, 3)
            }
except Exception as e:
    print("⚠️ coverage_base.xml 파싱 중 문제가 발생했습니다. 빈 결과로 계속합니다:", e)

(ART_DIR / "observed_outcomes_base.json").write_text(
    json.dumps(observed_outcomes, indent=2, ensure_ascii=False), encoding="utf-8"
)

branch_points = sum(len(v) for v in observed_outcomes.values())
full_hit = sum(1 for fp in observed_outcomes.values() for meta in fp.values()
               if meta["covered"] == meta["total"])
half_hit = sum(1 for fp in observed_outcomes.values() for meta in fp.values()
               if 0 < meta["covered"] < meta["total"])
zero_hit = sum(1 for fp in observed_outcomes.values() for meta in fp.values()
               if meta["covered"] == 0)

print("✅ observed_outcomes_base.json 생성 완료 →", ART_DIR / "observed_outcomes_base.json")
print(f" - 분기 포인트 수: {branch_points}")
print(f" - Full-hit  (양쪽 관측): {full_hit}")
print(f" - Half-hit  (한쪽 관측): {half_hit}")
print(f" - Zero-hit  (관측 0 / 미계측): {zero_hit}")

# ========= 6) 안내 =========
if rc_run != 0:
    print("ℹ️ 참고: 기준선 테스트 실행에서 실패가 있었지만, 커버리지 산출과 파싱은 계속 진행했습니다.")


>  coverage erase --rcfile /content/money-transfer-project-template-python/.coveragerc
>  /usr/bin/python3 -m coverage run --rcfile /content/money-transfer-project-template-python/.coveragerc -m pytest -q
>  coverage json -o coverage_base.json --rcfile /content/money-transfer-project-template-python/.coveragerc
>  coverage xml  -o coverage_base.xml --rcfile /content/money-transfer-project-template-python/.coveragerc
>  coverage html --rcfile /content/money-transfer-project-template-python/.coveragerc
✅ 기존 테스트 실행 및 커버리지 수집 완료 (기준선)
 - JSON : /content/money-transfer-project-template-python/run_artifacts/run1/coverage_base.json
 - XML  : /content/money-transfer-project-template-python/run_artifacts/run1/coverage_base.xml
 - HTML : /content/money-transfer-project-template-python/htmlcov/index.html
총 파일 수: 6 / 미커버 파일 수: 5
미커버 라인 수 총합: 54
✅ uncovered_map_base.json 생성 완료 → /content/money-transfer-project-template-python/run_artifacts/run1/uncovered_map_base.json
✅ observed_outcomes_base.json 

In [3]:
#@title 3-2) 복합 목표 생성 – AST 분석 및 목표 구조화 (모킹 플랜 자동추론 강화판)
import ast
import json
from pathlib import Path
from typing import Dict, List, Tuple, Set, Optional

PROJ_PATH = Path(PROJ).resolve()
ART_DIR = PROJ_PATH / "run_artifacts" / "run1"

# ✅ baseline 산출물 사용
UNCOVERED_JSON = ART_DIR / "uncovered_map_base.json"
OBSERVED_JSON  = ART_DIR / "observed_outcomes_base.json"

assert UNCOVERED_JSON.exists(), "uncovered_map_base.json이 없습니다. 3-1(기준선) 단계를 먼저 실행하세요."
assert OBSERVED_JSON.exists(),  "observed_outcomes_base.json이 없습니다. 3-1(기준선) 단계를 먼저 실행하세요."

# ---------------- utils ----------------
def norm_abs(p: str) -> str:
    q = Path(p)
    if not q.is_absolute():
        q = (PROJ_PATH / q).resolve()
    else:
        q = q.resolve()
    return str(q)

def rel_from_proj(abs_path: str) -> str:
    try:
        return str(Path(abs_path).resolve().relative_to(PROJ_PATH))
    except Exception:
        return abs_path

def is_source(abs_path: str) -> bool:
    """분석 대상 소스만 허용: 테스트/산출물/가상환경 등 제외"""
    try:
        rel = Path(abs_path).resolve().relative_to(PROJ_PATH)
    except Exception:
        return False
    s = str(rel).replace("\\", "/")
    if not s.endswith(".py"): return False
    if s.startswith(("generated_tests/", "tests/", ".venv/", "venv/", "run_artifacts/", "htmlcov/")):
        return False
    return True

def try_unparse(node: ast.AST) -> Optional[str]:
    try:
        import ast as _ast
        if hasattr(_ast, "unparse"):
            return _ast.unparse(node)
    except Exception:
        pass
    return None

# ---------------- load inputs ----------------
raw_uncovered: Dict[str, List[int]] = json.loads(UNCOVERED_JSON.read_text(encoding="utf-8"))
uncovered_map: Dict[str, List[int]] = {}
for k, v in raw_uncovered.items():
    k_abs = norm_abs(k)
    if is_source(k_abs):
        uncovered_map[k_abs] = sorted(set(int(x) for x in v))

raw_observed: Dict[str, Dict[str, Dict[str, float]]] = json.loads(OBSERVED_JSON.read_text(encoding="utf-8"))
observed_outcomes: Dict[str, Dict[int, Dict[str, float]]] = {}
for k, mapping in raw_observed.items():
    k_abs = norm_abs(k)
    if is_source(k_abs):
        fixed = {}
        for ln_str, meta in mapping.items():
            try:
                ln = int(ln_str)
            except Exception:
                continue
            fixed[ln] = meta
        observed_outcomes[k_abs] = fixed

# half-hit 집합(라인은 int로)
half_hit_map: Dict[str, Set[int]] = {}
for file_abs, mapping in observed_outcomes.items():
    halfs: Set[int] = set()
    for ln, meta in mapping.items():
        covered = int(meta.get("covered", 0))
        total   = int(meta.get("total", 0))
        if covered > 0 and covered < total:
            halfs.add(ln)
    if halfs:
        half_hit_map[file_abs] = halfs

# ---------------- 모킹 플랜 자동추론: 설정 ----------------
# 모듈/심볼 카탈로그
MOD_REQS = {"requests"}
MOD_HTTPX = {"httpx"}
MOD_TEMPORAL = {"temporalio"}
MOD_ASYNCIO = {"asyncio"}
MOD_TIME = {"time"}
MOD_DATETIME = {"datetime"}
MOD_OS = {"os"}
MOD_SUBPROCESS = {"subprocess"}
MOD_SYS = {"sys"}
MOD_BUILTINS = {"builtins"}

# 타깃 심볼 패턴
SYM_ASYNCIO_RUN = {("asyncio", "run")}
SYM_TIME_SLEEP = {("time", "sleep")}
SYM_DATETIME_NOW = {("datetime", "datetime", "now")}
SYM_OS_ENVIRON = {("os", "environ")}
SYM_SYS_EXIT = {("sys", "exit")}
# Temporal common surfaces
TEMPORAL_PREFIXES = (
    "temporalio.client.", "temporalio.worker.", "temporalio.workflow."
)

# ---------------- AST analysis ----------------
class FunctionInfo:
    def __init__(self, name: str, lineno: int):
        self.name = name or "<module>"
        self.lineno = lineno
        self.branches: List[int] = []
        self.defs: Dict[str, List[int]] = {}
        self.uses: Dict[str, List[int]] = {}
        self.exceptions: List[Tuple[str, int, dict]] = []
        self.side_effect_calls: List[Tuple[str, int]] = []  # (kind, line)

    def add_def(self, var: str, line: int):
        self.defs.setdefault(var, []).append(line)

    def add_use(self, var: str, line: int):
        self.uses.setdefault(var, []).append(line)

class ASTVisitor(ast.NodeVisitor):
    """임포트 별칭 추적 + 호출 qualname 복원으로 모킹 대상 자동 탐지 강화"""
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.stack: List[FunctionInfo] = []
        self.funcs: List[FunctionInfo] = []
        # 별칭 → 원본모듈/심볼 매핑
        self.alias_to_module: Dict[str, str] = {}
        self.symbol_to_module: Dict[str, str] = {}

    def current(self) -> FunctionInfo:
        if not self.stack:
            if not self.funcs or self.funcs[0].name != "<module>":
                fi = FunctionInfo("<module>", 1)
                self.funcs.insert(0, fi)
            return self.funcs[0]
        return self.stack[-1]

    # ---- import tracking ----
    def visit_Import(self, node: ast.Import):
        for alias in node.names:
            mod = alias.name  # e.g., "requests"
            asname = alias.asname or mod.split(".")[0]
            self.alias_to_module[asname] = mod
        self.generic_visit(node)

    def visit_ImportFrom(self, node: ast.ImportFrom):
        mod = node.module or ""
        for alias in node.names:
            asname = alias.asname or alias.name
            # 심볼이지만 상위 모듈에 귀속
            self.symbol_to_module[asname] = mod
        self.generic_visit(node)

    # ---- function scopes ----
    def visit_FunctionDef(self, node: ast.FunctionDef):
        fi = FunctionInfo(node.name, node.lineno)
        self.stack.append(fi)
        self.generic_visit(node)
        self.stack.pop()
        self.funcs.append(fi)

    def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef):
        self.visit_FunctionDef(node)

    # ---- branch-like points ----
    def visit_If(self, node: ast.If):
        self.current().branches.append(node.lineno)
        self.generic_visit(node)

    def visit_While(self, node: ast.While):
        self.current().branches.append(node.lineno)
        self.generic_visit(node)

    def visit_For(self, node: ast.For):
        self.current().branches.append(node.lineno)
        self.generic_visit(node)

    def visit_AsyncFor(self, node: ast.AsyncFor):
        self.current().branches.append(node.lineno)
        self.generic_visit(node)

    def visit_With(self, node: ast.With):
        self.current().branches.append(node.lineno)
        self.generic_visit(node)

    def visit_Try(self, node: ast.Try):
        self.current().branches.append(node.lineno)
        self.current().exceptions.append(("try", node.lineno, {}))
        self.generic_visit(node)

    def visit_ExceptHandler(self, node: ast.ExceptHandler):
        hint = {}
        if node.type is not None:
            typ = try_unparse(node.type) or getattr(getattr(node.type, "id", None), "id", None)
            if typ:
                hint["exception_type"] = typ
        self.current().branches.append(node.lineno)
        self.current().exceptions.append(("except", node.lineno, hint))
        self.generic_visit(node)

    # ---- exceptions ----
    def visit_Raise(self, node: ast.Raise):
        hint = {}
        if node.exc is not None:
            text = try_unparse(node.exc)
            if text:
                hint["expr"] = text
            if isinstance(node.exc, ast.Call):
                if isinstance(node.exc.func, ast.Name):
                    hint["exception_type"] = node.exc.func.id
                elif isinstance(node.exc.func, ast.Attribute):
                    hint["exception_type"] = node.exc.func.attr
                if node.exc.args:
                    a0 = node.exc.args[0]
                    if isinstance(a0, ast.Constant) and isinstance(a0.value, str):
                        hint["message_contains"] = a0.value[:80]
        self.current().exceptions.append(("raise", node.lineno, hint))
        self.generic_visit(node)

    def visit_Assert(self, node: ast.Assert):
        hint = {}
        if node.msg and isinstance(node.msg, ast.Constant) and isinstance(node.msg.value, str):
            hint["message_contains"] = node.msg.value[:80]
        self.current().exceptions.append(("assert", node.lineno, hint))
        self.generic_visit(node)

    # ---- defs/uses ----
    def visit_Name(self, node: ast.Name):
        if isinstance(node.ctx, ast.Store):
            self.current().add_def(node.id, node.lineno)
        elif isinstance(node.ctx, ast.Load):
            self.current().add_use(node.id, node.lineno)
        self.generic_visit(node)

    # ---- helper: reconstruct dotted name ----
    def _dotted_name(self, node: ast.AST) -> Optional[str]:
        # Name → "x"
        if isinstance(node, ast.Name):
            name = node.id
            # resolve alias/symbol to module if known
            if name in self.alias_to_module:
                return self.alias_to_module[name]
            if name in self.symbol_to_module:
                return f"{self.symbol_to_module[name]}.{name.split('.')[-1]}"
            return name
        # Attribute → "<base>.<attr>"
        if isinstance(node, ast.Attribute):
            base = self._dotted_name(node.value)
            if base:
                return f"{base}.{node.attr}"
        # Call target might be ast.Call( func=Attribute(...) ) etc.
        return None
    # ---- side-effects heuristics (수정판) ----
    def visit_Call(self, node: ast.Call):
        qn = self._dotted_name(node.func) or ""

        # builtins.open
        if qn in ("open", "builtins.open"):
            self.current().side_effect_calls.append(("io_open", node.lineno))

        # requests.*, httpx.*
        if any(qn.startswith(m + ".") for m in MOD_REQS):
            self.current().side_effect_calls.append(("net_requests", node.lineno))
        if any(qn.startswith(m + ".") for m in MOD_HTTPX):
            self.current().side_effect_calls.append(("net_httpx", node.lineno))

        # asyncio.run
        if qn == "asyncio.run":
            self.current().side_effect_calls.append(("asyncio_run", node.lineno))

        # time.sleep
        if qn == "time.sleep":
            self.current().side_effect_calls.append(("time_sleep", node.lineno))

        # datetime.datetime.now
        if qn == "datetime.datetime.now":
            self.current().side_effect_calls.append(("datetime_now", node.lineno))

        # os.environ[...] / os.environ.get(...)
        if qn.startswith("os.environ"):
            self.current().side_effect_calls.append(("env_access", node.lineno))

        # subprocess.*
        if any(qn.startswith(m + ".") for m in MOD_SUBPROCESS):
            self.current().side_effect_calls.append(("subprocess", node.lineno))

        # sys.exit
        if qn == "sys.exit":
            self.current().side_effect_calls.append(("sys_exit", node.lineno))

        # temporalio.*
        if any(qn.startswith(pref) for pref in TEMPORAL_PREFIXES) or any(qn.startswith(m + ".") for m in MOD_TEMPORAL):
            kind = "temporal_generic"
            if ".client." in qn:
                kind = "temporal_client"
            elif ".worker." in qn:
                kind = "temporal_worker"
            elif ".workflow." in qn:
                kind = "temporal_workflow"
            self.current().side_effect_calls.append((kind, node.lineno))

        self.generic_visit(node)

    def visit_Attribute(self, node: ast.Attribute):
        qn = self._dotted_name(node) or ""
        # os.environ (속성 참조도 기록)
        if qn.startswith("os.environ"):
            self.current().side_effect_calls.append(("env_access", node.lineno))
        self.generic_visit(node)

# ---------------- collect candidate files ----------------
candidate_files: List[str] = sorted(set([
    *uncovered_map.keys(),
    *observed_outcomes.keys(),
]))
candidate_files = [fp for fp in candidate_files if is_source(fp)]

# ---------------- per file AST → materials ----------------
file_infos: Dict[Tuple[str, str], Dict[str, List]] = {}

for file_abs in candidate_files:
    file_path = Path(file_abs)
    try:
        source = file_path.read_text(encoding="utf-8")
        tree = ast.parse(source)
    except Exception as e:
        print(f"⚠️ AST parse failed for {rel_from_proj(file_abs)}: {e}")
        continue

    visitor = ASTVisitor(rel_from_proj(file_abs))
    visitor.visit(tree)

    miss_lines: Set[int] = set(uncovered_map.get(file_abs, []))
    half_lines: Set[int] = half_hit_map.get(file_abs, set())

    for fi in visitor.funcs:
        # ---- 분기 필터: 미커버/half-hit 포함 라인만
        tb = sorted([ln for ln in fi.branches if (ln in miss_lines or ln in half_lines)])

        # ---- Def-Use 페어
        tu_pairs: List[Tuple[str, int, int]] = []
        for var, defs in fi.defs.items():
            uses = sorted(fi.uses.get(var, []))
            defs_sorted = sorted(defs)
            for u in uses:
                if u not in miss_lines:
                    continue
                d_le = [d for d in defs_sorted if d <= u]
                if not d_le:
                    continue
                d = max(d_le)
                if any((d < d2 < u) for d2 in defs_sorted if d2 != d):
                    continue
                if (u - d) > 40:
                    continue
                tu_pairs.append((var, d, u))
        tu = sorted(tu_pairs, key=lambda x: (x[2], x[1], x[0]))

        # ---- 예외: 미커버 포함만
        te = sorted(
            [(kind, line, hint) for (kind, line, hint) in fi.exceptions if line in miss_lines],
            key=lambda x: (x[1], x[0])
        )

        # ---- side-effects → mock 필요성/종류
        mock_kinds = sorted(set(k for (k, _) in fi.side_effect_calls))
        needs_mock = bool(mock_kinds)

        if tb or tu or te:
            file_infos[(rel_from_proj(file_abs), fi.name)] = {
                "branches": tb,
                "def_uses": tu,
                "exceptions": te,
                "func_lineno": fi.lineno,
                "needs_mock": needs_mock,
                "mock_kinds": mock_kinds,
            }

# ---------------- goal builders ----------------
MAX_NEAR_GAP = 8

# ==== [새 설정] 알고리즘1 하이퍼파라미터 ====
# coverage gain 가중치 (분기/Def-Use/예외) — 합이 1이 되도록
ALPHA_BRANCH = 0.45   # α
BETA_DU      = 0.25   # β
GAMMA_EXC    = 0.30   # γ

# generation cost 가중치 (상수/문맥/타겟수) — 합이 1이 되도록
LAMBDA_1_CONST   = 0.20  # λ1 · 1.0
LAMBDA_2_CONTEXT = 0.60  # λ2 · |Context_g|
LAMBDA_3_TARGETS = 0.20  # λ3 · |T_g|

# 예산(비용 단위). 예: 50이면 합산 cost가 50을 넘지 않는 선에서 선택
BUDGET_B = 50.0

# 중복(겹침) 허용 임계값 θ — 후보 g의 타깃 라인 중
# 기존에 선택된 목표들과 겹치는 비율이 θ 이상이면 제외
OVERLAP_THETA = 0.50  # 0.0(허용적) ~ 1.0(강한 배제)

# ---- coverage gain / cost / score ----
def coverage_gain_structural(b_cnt: int, du_cnt: int, exc_cnt: int) -> float:
    """rel(g) = α|B_g| + β|D_g| + γ|E_g|"""
    return ALPHA_BRANCH * b_cnt + BETA_DU * du_cnt + GAMMA_EXC * exc_cnt

def coverage_gain_total(target_lines: Set[int], b_cnt: int, du_cnt: int, exc_cnt: int) -> float:
    """gain(g) = |T_g| + rel(g)"""
    return float(len(target_lines)) + coverage_gain_structural(b_cnt, du_cnt, exc_cnt)

def generation_cost(context_size: int, target_count: int) -> float:
    """cost(g) = λ1*1.0 + λ2*|Context_g| + λ3*|T_g|"""
    return (LAMBDA_1_CONST * 1.0) + (LAMBDA_2_CONTEXT * float(context_size)) + (LAMBDA_3_TARGETS * float(target_count))

def compute_context_size(fi: FunctionInfo) -> int:
    """|Context_g| = 함수 내 정의/사용 라인의 총합(유니크)"""
    def_lines = {ln for lines in fi.defs.values() for ln in lines}
    use_lines = {ln for lines in fi.uses.values() for ln in lines}
    return len(def_lines | use_lines)

# ---------------- per file AST → materials (보강: context size 저장) ----------------
file_infos: Dict[Tuple[str, str], Dict[str, List]] = {}

for file_abs in candidate_files:
    file_path = Path(file_abs)
    try:
        source = file_path.read_text(encoding="utf-8")
        tree = ast.parse(source)
    except Exception as e:
        print(f"⚠️ AST parse failed for {rel_from_proj(file_abs)}: {e}")
        continue

    visitor = ASTVisitor(rel_from_proj(file_abs))
    visitor.visit(tree)

    miss_lines: Set[int] = set(uncovered_map.get(file_abs, []))
    half_lines: Set[int] = half_hit_map.get(file_abs, set())

    for fi in visitor.funcs:
        # ---- 분기 필터: 미커버/half-hit 포함 라인만
        tb = sorted([ln for ln in fi.branches if (ln in miss_lines or ln in half_lines)])

        # ---- Def-Use 페어
        tu_pairs: List[Tuple[str, int, int]] = []
        for var, defs in fi.defs.items():
            uses = sorted(fi.uses.get(var, []))
            defs_sorted = sorted(defs)
            for u in uses:
                if u not in miss_lines:
                    continue
                d_le = [d for d in defs_sorted if d <= u]
                if not d_le:
                    continue
                d = max(d_le)
                if any((d < d2 < u) for d2 in defs_sorted if d2 != d):
                    continue
                if (u - d) > 40:
                    continue
                tu_pairs.append((var, d, u))
        tu = sorted(tu_pairs, key=lambda x: (x[2], x[1], x[0]))

        # ---- 예외: 미커버 포함만
        te = sorted(
            [(kind, line, hint) for (kind, line, hint) in fi.exceptions if line in miss_lines],
            key=lambda x: (x[1], x[0])
        )

        # ---- side-effects → mock 종류(메타정보로 유지)
        mock_kinds = sorted(set(k for (k, _) in fi.side_effect_calls))
        needs_mock = bool(mock_kinds)

        # ---- 문맥 크기(|Context_g|) 저장
        ctx_size = compute_context_size(fi)

        if tb or tu or te:
            file_infos[(rel_from_proj(file_abs), fi.name)] = {
                "branches": tb,
                "def_uses": tu,
                "exceptions": te,
                "func_lineno": fi.lineno,
                "needs_mock": needs_mock,
                "mock_kinds": mock_kinds,
                "context_size": ctx_size,
            }

# ---------------- goal builders (후보 생성) ----------------
def make_goal(file_rel: str, func_name: str, func_lineno: int,
              branches: List[int], def_uses: List[Tuple[str,int,int]],
              exceptions: List[Tuple[str,int,dict]], needs_mock: bool, mock_kinds: List[str],
              context_size: int) -> dict:
    # T_g
    target_lines: Set[int] = set(branches)
    for _, d, u in def_uses:
        target_lines.update([d, u])
    for _, line, _ in exceptions:
        target_lines.add(line)

    # 구조 수량
    b_cnt = len(branches)
    du_cnt = len(def_uses)
    exc_cnt = len(exceptions)

    # 이득/비용/점수
    gain = coverage_gain_total(target_lines, b_cnt, du_cnt, exc_cnt)
    cost = generation_cost(context_size, len(target_lines))
    score = (gain / cost) if cost > 0 else 0.0

    # half-hit 존재 여부(힌트)
    file_abs = norm_abs(file_rel)
    need_two = any(ln in half_hit_map.get(file_abs, set()) for ln in branches)

    exc_hints = []
    for kind, line, hint in exceptions:
        h = {"kind": kind, "line": line}
        h.update(hint or {})
        exc_hints.append(h)

    return {
        "id": None,
        "file": file_rel,
        "function": {"name": func_name, "lineno": func_lineno},
        "components": {
            "branches": [{"line": ln} for ln in branches],
            "def_uses": [{"var": var, "def_line": d, "use_line": u} for var, d, u in def_uses],
            "exceptions": exc_hints,
        },
        "target_lines": sorted(target_lines),
        "hints": {
            "need_two_sides_for_half_hit": need_two,
            "needs_mock": needs_mock,
            "mock_plan": mock_kinds,
            "exception_hint": exc_hints,
        },
        "coverage_gain": round(gain, 6),
        "generation_cost": round(cost, 6),
        "score": round(score, 6),
        "context_size": context_size,
    }

# 후보 생성
candidates: List[dict] = []

for (file_rel, func_name), info in file_infos.items():
    func_lineno = info["func_lineno"]
    tb = info["branches"]
    tu = info["def_uses"]
    te = info["exceptions"]
    needs_mock = info["needs_mock"]
    mock_kinds = info["mock_kinds"]
    ctx_size = info["context_size"]

    # 단일
    for b in tb:
        candidates.append(make_goal(file_rel, func_name, func_lineno, [b], [], [], needs_mock, mock_kinds, ctx_size))
    for var, d, u in tu:
        candidates.append(make_goal(file_rel, func_name, func_lineno, [], [(var, d, u)], [], needs_mock, mock_kinds, ctx_size))
    for kind, line, hint in te:
        candidates.append(make_goal(file_rel, func_name, func_lineno, [], [], [(kind, line, hint)], needs_mock, mock_kinds, ctx_size))

    # 2-개 조합 (근접 연결)
    for b in tb:
        for var, d, u in tu:
            if max(b, u) - min(b, d) <= MAX_NEAR_GAP:
                candidates.append(make_goal(file_rel, func_name, func_lineno, [b], [(var, d, u)], [], needs_mock, mock_kinds, ctx_size))
    for b in tb:
        for kind, line, hint in te:
            if abs(b - line) <= MAX_NEAR_GAP:
                candidates.append(make_goal(file_rel, func_name, func_lineno, [b], [], [(kind, line, hint)], needs_mock, mock_kinds, ctx_size))
    for var, d, u in tu:
        for kind, line, hint in te:
            if max(u, line) - min(d, line) <= MAX_NEAR_GAP:
                candidates.append(make_goal(file_rel, func_name, func_lineno, [], [(var, d, u)], [(kind, line, hint)], needs_mock, mock_kinds, ctx_size))

    # 3-개 조합 (근접 연결)
    for b in tb:
        for var, d, u in tu:
            for kind, line, hint in te:
                lines = [b, d, u, line]
                if max(lines) - min(lines) <= MAX_NEAR_GAP:
                    candidates.append(make_goal(file_rel, func_name, func_lineno, [b], [(var, d, u)], [(kind, line, hint)], needs_mock, mock_kinds, ctx_size))

# ---------------- 후보 중복 제거(동등 구성은 score 높은 것만) ----------------
def goal_key(g):
    comps = g["components"]
    return (
        tuple(sorted(b["line"] for b in comps["branches"])),
        tuple(sorted((du["var"], du["def_line"], du["use_line"]) for du in comps["def_uses"])),
        tuple(sorted((ex.get("kind"), ex.get("line")) for ex in comps["exceptions"])),
        g["file"],
        g["function"]["name"],
    )

unique_map = {}
for g in candidates:
    k = goal_key(g)
    if k not in unique_map or g["score"] > unique_map[k]["score"]:
        unique_map[k] = g

candidates = list(unique_map.values())

# ---------------- 예산 기반 선택 (Algorithm 1: lines 12~15) ----------------
def overlap_ratio(g: dict, selected: List[dict]) -> float:
    """overlap(g, selected) = |Tg ∩ (⋃Ts)| / |Tg|"""
    if not selected:
        return 0.0
    tg = set(g["target_lines"])
    if not tg:
        return 1.0  # 빈 목표는 의미 없으므로 겹침 100% 취급
    union_sel = set()
    for s in selected:
        union_sel.update(s["target_lines"])
    inter = tg & union_sel
    return len(inter) / len(tg)

# 점수 내림차순(동점이면 gain 큰 순, target 작을수록 우선)
candidates.sort(key=lambda x: (-x["score"], -x["coverage_gain"], x["generation_cost"], x["file"], x["function"]["name"]))

selected: List[dict] = []
budget = float(BUDGET_B)

rem = candidates[:]  # 남은 후보
while budget > 0.0 and rem:
    g = rem.pop(0)  # 최고 점수
    if g["score"] <= 0.0:
        continue
    if overlap_ratio(g, selected) < OVERLAP_THETA and g["generation_cost"] <= budget:
        selected.append(g)
        budget -= g["generation_cost"]

# ---------------- id 부여 및 저장 ----------------
for i, g in enumerate(selected, 1):
    g["id"] = f"{i:04d}"

# 상세 원본(선택 결과)
(ART_DIR / "goals_raw.json").write_text(
    json.dumps(selected, indent=2, ensure_ascii=False),
    encoding="utf-8"
)

# 랭크 출력(이득 기준 정렬 유지)
ranked = [
    {
        "id": g["id"],
        "file": g["file"],
        "function": g["function"],
        "components": g["components"],
        "target_lines": g["target_lines"],
        "coverage_gain": round(float(g["coverage_gain"]), 3),
        "generation_cost": round(float(g["generation_cost"]), 3),
        "score": round(float(g["score"]), 3),
        "hints": g["hints"],
        "context_size": g["context_size"],
    }
    for g in selected
]
(ART_DIR / "goals_ranked.json").write_text(
    json.dumps(ranked, indent=2, ensure_ascii=False),
    encoding="utf-8"
)

print(f"✅ 복합 목표 생성 완료: {len(selected)}개 선택 (예산 B={BUDGET_B}, θ={OVERLAP_THETA})")
print(" - 입력: uncovered_map_base.json, observed_outcomes_base.json")
print(" - 저장: goals_raw.json, goals_ranked.json")
print(f" - 후보 총수: {len(candidates)} / 선택 총비용: {round(BUDGET_B - budget, 3)} / 남은 예산: {round(budget, 3)}")



✅ 복합 목표 생성 완료: 10개 선택 (예산 B=50.0, θ=0.5)
 - 입력: uncovered_map_base.json, observed_outcomes_base.json
 - 저장: goals_raw.json, goals_ranked.json
 - 후보 총수: 94 / 선택 총비용: 44.8 / 남은 예산: 5.2


In [4]:
#@title 3-3) LLM 프롬프트 생성 – 시스템/사용자 지시부 이원화(우선순위/식별자 포함)
import json, re
from pathlib import Path

PROJ_PATH = Path(PROJ).resolve()
ART_DIR = PROJ_PATH / "run_artifacts" / "run1"
GOALS_FILE = ART_DIR / "goals_ranked.json"
LLM_PROMPTS_PATH = ART_DIR / "llm_prompts.jsonl"

assert GOALS_FILE.exists(), "goals_ranked.json이 없습니다. 3-2 단계를 먼저 실행하세요."
goals = json.loads(GOALS_FILE.read_text(encoding="utf-8"))

def to_mod_name(file_rel: str) -> str:
    s = file_rel.replace("\\", "/")
    if s.endswith(".py"):
        s = s[:-3]
    return s.replace("/", ".")

def suggest_filename(goal):
    mod = to_mod_name(goal["file"]).split(".")[-1]
    func = goal["function"]["name"]
    safe = lambda x: re.sub(r"[^a-zA-Z0-9_]+", "_", str(x))
    return f"test_gen_{safe(goal['id'])}_{safe(mod)}_{safe(func)}.py"

# ---------------- 시스템 지시부 (역할/출력/제약 고정) ----------------
SYSTEM_INSTR = (
    "역할: 당신은 주어진 목표(분기/정의-사용/예외)를 실제로 실행하는 PyTest 테스트 코드를 생성하는 '테스트 생성기'입니다.\n"
    "출력 형식: 마크다운/주석 없이 오직 하나의 JSON 객체로만 응답하세요.\n"
    "출력 스키마:\n"
    "{\n"
    '  "filename": "test_*.py",\n'
    '  "tests": [\n'
    '    {"name": "test_*", "code": "<pytest 테스트 파일 전체 코드 문자열>"}\n'
    "  ]\n"
    "}\n"
    "행동 제약(엄격):\n"
    "• 원본 코드는 수정 금지, 테스트 파일만 작성\n"
    "• importlib로 모듈 로드 후 getattr로 심볼 접근\n"
    "  └ 심볼이 '정말로' 없을 때만 다음 가드 패턴으로 skip 허용:\n"
    "     >>> tgt = getattr(mod, 'symbol', None)\n"
    "     >>> if tgt is None:\n"
    "     >>>     pytest.skip('symbol missing')\n"
    "  └ 위 가드 없이 호출되는 모든 skip은 무조건 금지(검증기에서 즉시 탈락)\n"
    "• 각 테스트는 최소 1개 이상의 assert 또는 pytest.raises(...)를 포함해야 함(없으면 탈락)\n"
    "• 파일/네트워크/시간/환경/비동기 루프/Temporal 등 외부 접근은 mock/monkeypatch로 대체\n"
    "• 분기는 양 경로를 모두 검증(half-hit 지시 시 독립 테스트 2개)\n"
    "• def-use는 (def_line→use_line) 효과를 관측 가능한 assert로 입증\n"
    "• 예외 경로는 with pytest.raises(...)로 타입/가능하면 메시지를 검증\n"
    "• 전역 상태 잔존 금지, 불필요한 광범위 try/except 금지, 무관한 assert 금지\n"
    "• 테스트 이름에 타격 라인 포함: `..._hits_L<line>` 형태 권장\n"
)


# 우선순위 기준: score가 있으면 score, 없으면 coverage_gain
def priority_of(g: dict) -> float:
    return float(g.get("score", g.get("coverage_gain", 0.0)))

# rank를 부여(이미 goals_ranked.json이 정렬되어 있어도 안전하게 재정렬)
goals_sorted = sorted(goals, key=lambda x: (-priority_of(x), -float(x.get("coverage_gain", 0.0))))

ART_DIR.mkdir(parents=True, exist_ok=True)
with LLM_PROMPTS_PATH.open("w", encoding="utf-8") as outf:
    for rank, goal in enumerate(goals_sorted, start=1):
        module_name = to_mod_name(goal["file"])
        suggested = suggest_filename(goal)

        comps = goal.get("components", {}) or {}
        hints = goal.get("hints", {}) or {}
        mock_plan = hints.get("mock_plan", []) or []
        need_two  = bool(hints.get("need_two_sides_for_half_hit", False))

        # ---------------- 사용자 지시부 (실행 경로/검증 대상 명세) ----------------
        USER_PAYLOAD = {
            "schema_version": "v1",
            "identifier": {
                "id": goal["id"],
                "rank": rank,
                "priority": priority_of(goal),           # 실행 단계에서 동적 우선순위로 활용
                "basis": "score" if "score" in goal else "coverage_gain"
            },
            "project": {
                "root": str(PROJ_PATH),
                "module": module_name
            },
            "goal": {
                "file": goal["file"],
                "function": goal["function"],             # {"name": ..., "lineno": ...}
                "components": {
                    "branches": comps.get("branches", []),
                    "def_uses": comps.get("def_uses", []),
                    "exceptions": comps.get("exceptions", [])
                },
                "target_lines": goal.get("target_lines", [])
            },
            "constraints": {
                "filename_suggestion": suggested,
                "import_policy": {
                    "strategy": "importlib_only",
                    "on_missing": "pytest.skip"           # 속성이 없을 때만 skip 허용
                },
                "isolation_policy": {
                    "no_fs_no_net": True,
                    "patch_time": True,
                    "forbid_asyncio_run": True,
                    "forbid_temporal_real_runs": True,
                    "patch_env": True
                },
                "execution_contract": {
                    "must_hit_at_least_n_target_lines": 1,
                    "require_two_tests_for_half_hit": need_two,
                    "test_name_must_include_hit_lines": True
                },
                "assert_policy": {
                    "prefer_pytest_raises": True,
                    "prefer_explicit_asserts": True,
                    "no_unrelated_asserts": True
                }
            },
            "hints": {
                "needs_mock": bool(hints.get("needs_mock", False) or mock_plan),
                "mock_plan": mock_plan,                    # ["io_open","net_requests","env_access","time_sleep","datetime_now",...]
                "exception_hint": comps.get("exceptions", [])
            },
            # 테스트 스캐폴딩(생성 순서 가이드만 제공)
            "scaffolding": [
                "1) importlib로 모듈 로드, getattr로 심볼 확보(없으면만 skip).",
                "2) 목표 경로(분기/def-use/예외)를 만족하는 입력 벡터 구성.",
                "3) 외부 의존은 monkeypatch/더블로 대체.",
                "4) 호출로 target_lines를 실제 타격.",
                "5) 관측 가능한 assert 작성(반환/상태/호출/예외).",
                "6) 테스트 이름에 hits_L<line> 포함."
                "7) 예시: tgt = getattr(mod, 'symbol', None);  if tgt is None: pytest.skip('symbol missing')"
            ],
            # 모델에게 필요한 최소 지시만 남겨 군더더기 제거
            "instructions": [
                "오직 지정된 JSON 스키마만 반환하세요.",
                "각 테스트는 target_lines 중 최소 1줄을 실행해야 하며, 분기는 양 경로를 검증하세요.",
                "def-use는 (def_line→use_line)의 효과를 관측 가능하게 검증하고, 예외는 타입/메시지를 검증하세요.",
                "외부 접근은 모두 mock/monkeypatch로 대체하세요.",
                "각 테스트에 최소 1개 이상의 assert 또는 pytest.raises(...)를 반드시 포함하세요.",
                "pytest.skip()는 심볼이 없는 경우의 '가드형' 패턴에서만 허용되며, 그 외 사용 시 테스트는 거부됩니다."
                ]
        }

        record = {
            "meta": {
                "id": goal["id"],
                "rank": rank,
                "priority": USER_PAYLOAD["identifier"]["priority"],
                "priority_basis": USER_PAYLOAD["identifier"]["basis"],
                "file": goal["file"],
                "function": goal["function"]["name"],
                "coverage_gain": float(goal.get("coverage_gain", 0.0)),
                "score": float(goal.get("score", USER_PAYLOAD["identifier"]["priority"])),
                "suggested_filename": suggested,
                "module": module_name
            },
            "messages": [
                {"role": "system", "content": SYSTEM_INSTR},
                {"role": "user", "content": json.dumps(USER_PAYLOAD, ensure_ascii=False, indent=2)}
            ]
        }
        outf.write(json.dumps(record, ensure_ascii=False) + "\n")

print(f"✅ LLM 프롬프트 생성 완료 → {LLM_PROMPTS_PATH}")
print("   - 총 목표 수:", len(goals_sorted))
print("   - 최상위 우선순위 목표 ID:", goals_sorted and goals_sorted[0].get("id"))
print("   - 예시 파일명:", goals_sorted and suggest_filename(goals_sorted[0]))


✅ LLM 프롬프트 생성 완료 → /content/money-transfer-project-template-python/run_artifacts/run1/llm_prompts.jsonl
   - 총 목표 수: 10
   - 최상위 우선순위 목표 ID: 0001
   - 예시 파일명: test_gen_0001_banking_service_deposit.py


In [5]:
#@title 3-4) 테스트 코드 생성 – 스키마/구조/최소 실행 요건 검증 후 저장
import os
import re
import json
import time
import ast
from pathlib import Path
import httpx
import backoff
from openai import OpenAI, APIError, RateLimitError, APIConnectionError

# ---------- 경로 설정 ----------
ART_DIR = Path(PROJ) / "run_artifacts" / "run1"
LLM_PROMPTS_PATH = ART_DIR / "llm_prompts.jsonl"
GEN_DIR = Path(PROJ) / "generated_tests"
RAW_DIR = ART_DIR / "_raw"
ERR_DIR = ART_DIR / "_errors"
GEN_DIR.mkdir(parents=True, exist_ok=True)
RAW_DIR.mkdir(parents=True, exist_ok=True)
ERR_DIR.mkdir(parents=True, exist_ok=True)

# ---------- OpenAI 클라이언트 ----------
if not os.getenv("OPENAI_API_KEY"):
    raise RuntimeError("OPENAI_API_KEY가 설정되지 않았습니다. 3-0 단계에서 .env를 로드했는지 확인하세요.")

http_client = httpx.Client(
    timeout=180.0,
    follow_redirects=True,
    limits=httpx.Limits(max_connections=1, max_keepalive_connections=0),
    transport=httpx.HTTPTransport(retries=5),
)
client = OpenAI(
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url="https://api.openai.com/v1",
    http_client=http_client,
)

# ---------- 유틸 ----------
_slug_re = re.compile(r"[^a-z0-9_]+")
def slugify(s: str, maxlen: int = 40) -> str:
    s = s.lower().strip().replace("-", "_").replace(" ", "_")
    s = _slug_re.sub("_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s[:maxlen] or "t"

def strip_fences(s: str) -> str:
    s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s.strip())
    s = re.sub(r"\s*```$", "", s)
    return s

def ensure_unique_path(base: Path) -> Path:
    p = base
    i = 2
    while p.exists():
        p = base.with_name(f"{base.stem}_{i}{base.suffix}")
        i += 1
    return p

def write_error(goal_id: str, kind: str, payload: dict, idx: int | None = None):
    tag = f"goal_{goal_id}_{kind}" if idx is None else f"goal_{goal_id}_t{idx}_{kind}"
    (ERR_DIR / f"{tag}.json").write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")

# ---------- 보정기: runner 제거 ----------
def sanitize_test_code(code: str) -> str:
    """
    테스트 파일 안의 runner 호출 제거:
      - if __name__ == "__main__": ... pytest.main()/unittest.main()
      - 파일 어디든 있는 pytest.main(...), unittest.main(...)
    """
    patterns = [
        re.compile(r"(?ms)^\s*if\s+__name__\s*==\s*['\"]__main__['\"]\s*:\s*\n(?:\s+.*\n?)+$"),
        re.compile(r"(?m)^\s*pytest\.main\s*\(.*?\)\s*$"),
        re.compile(r"(?m)^\s*unittest\.main\s*\(.*?\)\s*$"),
    ]
    new = code
    for pat in patterns:
        new = pat.sub("", new)
    return new.strip() + "\n"

# ---------- 검증 로직 ----------
RE_IMPORTLIB = re.compile(r"\bimportlib\.import_module\s*\(")
RE_PYTEST_RAISES = re.compile(r"\bpytest\.raises\s*\(")

def validate_json_schema(result: dict) -> tuple[bool, list[str]]:
    reasons = []
    if not isinstance(result, dict):
        return False, ["not_a_json_object"]
    if "tests" not in result:
        reasons.append("missing_tests")
    else:
        if not isinstance(result["tests"], list) or len(result["tests"]) == 0:
            reasons.append("tests_empty_or_not_list")
        else:
            for i, t in enumerate(result["tests"], start=1):
                if not isinstance(t, dict):
                    reasons.append(f"test_{i}_not_object"); continue
                if "code" not in t:
                    reasons.append(f"test_{i}_missing_code")
                if "name" not in t:
                    reasons.append(f"test_{i}_missing_name")
    if "filename" in result:
        fn = str(result["filename"])
        if not fn.endswith(".py"):
            reasons.append("filename_not_py")
    return (len(reasons) == 0), reasons

def parse_ast_or_error(code: str):
    try:
        return ast.parse(code), None
    except SyntaxError as e:
        return None, f"syntax_error:{e.msg}@L{e.lineno}"

def extract_test_funcs(tree: ast.AST) -> list[ast.FunctionDef]:
    return [n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef) and n.name.startswith("test_")]

def has_assert_or_raises(tree: ast.AST, code: str) -> bool:
    has_assert_stmt = any(isinstance(n, ast.Assert) for n in ast.walk(tree))
    has_pytest_raises = bool(RE_PYTEST_RAISES.search(code))
    return has_assert_stmt or has_pytest_raises

def uses_importlib(code: str) -> bool:
    return bool(RE_IMPORTLIB.search(code))

# ===== 가드형 skip 허용 (AST 기반) =====
def _parent_map(tree: ast.AST):
    parent = {}
    for node in ast.walk(tree):
        for child in ast.iter_child_nodes(node):
            parent[child] = node
    return parent

def _is_guarded_skip(call: ast.Call, parent_map, src: str) -> bool:
    """
    허용 가드:
      (1) if <expr> is None: pytest.skip(...)
      (2) except ImportError/NameError: pytest.skip(...)
    """
    cur = call
    while cur in parent_map:
        cur = parent_map[cur]
        if isinstance(cur, ast.If):
            test = cur.test
            if (
                isinstance(test, ast.Compare)
                and len(test.ops) == 1
                and isinstance(test.ops[0], ast.Is)
                and len(test.comparators) == 1
                and isinstance(test.comparators[0], ast.Constant)
                and test.comparators[0].value is None
            ):
                return True
        if isinstance(cur, ast.ExceptHandler):
            t = cur.type
            if isinstance(t, ast.Name) and t.id in {"ImportError", "NameError"}:
                return True
            if isinstance(t, ast.Tuple) and any(isinstance(e, ast.Name) and e.id in {"ImportError", "NameError"} for e in t.elts):
                return True
    return False

def has_unconditional_skip(code: str) -> tuple[bool, list[int]]:
    try:
        tree = ast.parse(code)
    except SyntaxError:
        return (False, [])
    parent = _parent_map(tree)
    bad_lines = []
    for node in ast.walk(tree):
        if isinstance(node, ast.Call):
            f = node.func
            if isinstance(f, ast.Attribute) and isinstance(f.value, ast.Name) and f.value.id == "pytest" and f.attr == "skip":
                if not _is_guarded_skip(node, parent, code):
                    bad_lines.append(getattr(node, "lineno", -1))
    return (len(bad_lines) > 0, bad_lines)

# ---------- 최소 실행 요건 ----------
def minimal_viability_checks(code: str) -> tuple[bool, list[str], dict]:
    reasons = []
    meta = {"warnings": []}
    if len(code.strip()) < 60:
        reasons.append("too_short")

    tree, synerr = parse_ast_or_error(code)
    if synerr:
        reasons.append(synerr)
        return False, reasons, meta

    tests = extract_test_funcs(tree)
    if not tests:
        reasons.append("no_test_functions")
    if not has_assert_or_raises(tree, code):
        reasons.append("no_assert_or_raises")
    if not uses_importlib(code):
        reasons.append("no_importlib_import_module")

    if tests and not any("hits_L" in t.name for t in tests):
        meta["warnings"].append("missing_hits_L_in_test_name")

    # 무조건 skip 제외
    has_bad_skip, bad_lines = has_unconditional_skip(code)
    if has_bad_skip:
        reasons.append(f"unconditional_skip_detected@{bad_lines}")

    # runner 금지 패턴 감지
    if re.search(r"(?m)^\s*pytest\.main\s*\(", code):
        reasons.append("forbidden_runner_invocation:pytest.main")
    if re.search(r"(?m)^\s*unittest\.main\s*\(", code):
        reasons.append("forbidden_runner_invocation:unittest.main")
    if re.search(r"(?ms)^\s*if\s+__name__\s*==\s*['\"]__main__['\"]\s*:", code):
        meta.setdefault("warnings", []).append("sanitizable_main_guard_present")

    return (len(reasons) == 0), reasons, meta

# ---------- OpenAI 호출 ----------
@backoff.on_exception(backoff.expo, (APIConnectionError, APIError, RateLimitError), max_tries=8, max_time=300)
def call_openai_with_retry(messages):
    resp = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        response_format={"type": "json_object"},
        timeout=180.0,
    )
    return resp.choices[0].message.content

# ---------- 메인 루프 ----------
gen_log_path = ART_DIR / "gen_log.jsonl"
ok_count = 0
fail_count = 0

with LLM_PROMPTS_PATH.open("r", encoding="utf-8") as f_in, gen_log_path.open("w", encoding="utf-8") as f_log:
    for line in f_in:
        rec = json.loads(line)
        goal_id = rec["meta"]["id"]
        messages = rec["messages"]
        print(f"\n🚀 Goal {goal_id} 테스트 생성 요청…")

        # 1) 모델 호출
        try:
            out_text = call_openai_with_retry(messages)
        except Exception as e:
            fail_count += 1
            write_error(goal_id, "request_error", {"error": str(e)})
            print(f"❌ Goal {goal_id} 요청 실패: {e}")
            continue

        # 2) 원문 보관
        (RAW_DIR / f"goal_{goal_id}_raw.json").write_text(out_text, encoding="utf-8")

        # 3) JSON 파싱
        try:
            cleaned = strip_fences(out_text)
            result = json.loads(cleaned)
        except Exception as e:
            fail_count += 1
            write_error(goal_id, "json_parse_error", {"error": str(e), "raw": out_text[:2000]})
            print(f"❌ Goal {goal_id} JSON 파싱 실패: {e}")
            continue

        # 4) 스키마 검증
        ok_schema, schema_reasons = validate_json_schema(result)
        if not ok_schema:
            fail_count += 1
            write_error(goal_id, "schema_error", {"reasons": schema_reasons, "result": result})
            print(f"❌ Goal {goal_id} 스키마 오류: {schema_reasons}")
            continue

        tests = result.get("tests", [])
        base_name = result.get("filename")
        suffix = ".py"
        if base_name:
            base_name = Path(base_name).name
            suffix = Path(base_name).suffix or ".py"

        # 5) 테스트별 구조/요건 검증 후 저장
        saved_files = []
        excluded = []
        for idx, t in enumerate(tests, start=1):
            name = t.get("name", f"test_{goal_id}_{idx}")
            code = strip_fences(t.get("code", ""))
            code = sanitize_test_code(code)  # runner 제거

            ok_min, reasons, meta = minimal_viability_checks(code)
            if not ok_min:
                excluded.append({"index": idx, "name": name, "reasons": reasons, **meta})
                write_error(goal_id, "min_viability", {"index": idx, "name": name, "reasons": reasons, **meta}, idx=idx)
                print(f"⚠️ Goal {goal_id} 테스트 #{idx} 제외: {reasons}")
                continue

            if len(tests) == 1 and result.get("filename"):
                out_stem = Path(base_name).stem
            else:
                slug = slugify(result.get("filename", f"goal_{goal_id}"))
                out_stem = f"test_{goal_id}_{slug}_{idx}" if len(tests) > 1 else f"test_{goal_id}_{slug}"

            out_path = ensure_unique_path(GEN_DIR / f"{out_stem}{suffix}")
            out_path.write_text(code, encoding="utf-8")
            saved_files.append(out_path.name)
            print(f"✅ 저장: {out_path.name}  (warnings: {','.join(meta.get('warnings', [])) or '없음'})")

        # 6) 결과 정리
        if saved_files:
            ok_count += 1
            f_log.write(json.dumps({
                "goal_id": goal_id,
                "saved_files": saved_files,
                "excluded_tests": excluded,
            }, ensure_ascii=False) + "\n")
        else:
            fail_count += 1
            write_error(goal_id, "no_valid_tests", {
                "result_head": result if len(json.dumps(result)) < 4000 else "omitted(large)",
                "excluded_tests": excluded
            })
            print(f"❌ Goal {goal_id} 유효 테스트 없음 → 기록만 남김")

print(f"\n✅ 생성 단계 종료: 성공 {ok_count} / 실패 {fail_count}")
print(f"   • 저장 폴더 : {GEN_DIR}")
print(f"   • 원문 보관 : {RAW_DIR}")
print(f"   • 에러/제외 : {ERR_DIR}")
print(f"   • 로그 파일 : {gen_log_path}")



🚀 Goal 0001 테스트 생성 요청…

🚀 Goal 0002 테스트 생성 요청…

🚀 Goal 0003 테스트 생성 요청…

🚀 Goal 0004 테스트 생성 요청…

🚀 Goal 0005 테스트 생성 요청…

🚀 Goal 0006 테스트 생성 요청…

🚀 Goal 0007 테스트 생성 요청…

🚀 Goal 0008 테스트 생성 요청…

🚀 Goal 0009 테스트 생성 요청…

🚀 Goal 0010 테스트 생성 요청…

✅ 생성 단계 종료: 성공 10 / 실패 0
   • 저장 폴더 : /content/money-transfer-project-template-python/generated_tests
   • 원문 보관 : /content/money-transfer-project-template-python/run_artifacts/run1/_raw
   • 에러/제외 : /content/money-transfer-project-template-python/run_artifacts/run1/_errors
   • 로그 파일 : /content/money-transfer-project-template-python/run_artifacts/run1/gen_log.jsonl


In [6]:
#@title 3-5) 기존 tests + 생성 tests 격리 실행 · 로그 수집 · 샤드 결합 · 향상치 계산
import os, sys, json, re, time, subprocess, shutil, shlex
from pathlib import Path
from datetime import datetime, timezone
from lxml import etree

# ==== 경로/상수 ====
assert 'PROJ' in globals(), "3-0 단계를 먼저 실행하세요."
PROJ = Path(PROJ).resolve()
ART_DIR = PROJ / "run_artifacts" / "run1"
GEN_DIR = PROJ / "generated_tests"
TESTS_DIR = PROJ / "tests"  # 기존 테스트 루트
LOG_DIR = ART_DIR / "logs"
COV_SHARDS_DIR = ART_DIR / "cov_shards"
HTML_DIR_GEN = PROJ / "htmlcov_gen"

ART_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
COV_SHARDS_DIR.mkdir(parents=True, exist_ok=True)
HTML_DIR_GEN.mkdir(parents=True, exist_ok=True)

RCFILE = PROJ / ".coveragerc"
rc_opt = f" --rcfile {RCFILE}" if RCFILE.exists() else ""

# 실행 파라미터
PY_EXE = sys.executable
TIMEOUT_SEC_GEN = 30            # 생성 테스트 파일 1개당 타임아웃
TIMEOUT_SEC_BASE = 120          # 기존 tests 전체 실행 타임아웃
PYTEST_FLAGS = "-q -s"
ENV_BASE = os.environ.copy()

# goal_id 추출
RE_GOAL = re.compile(r"(?:^|[_-])(?P<gid>\d{4})(?:[_-]|$)")

# ==== 유틸 ====
def goal_id_from_name(name: str) -> str | None:
    m = RE_GOAL.search(name)
    return m.group("gid") if m else None

def sh(cmd: str, cwd: Path | None = None, timeout: int | None = None, env: dict | None = None):
    try:
        p = subprocess.run(
            cmd, cwd=str(cwd or PROJ), env=env or ENV_BASE,
            shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            timeout=timeout, text=True
        )
        return p.returncode, p.stdout, p.stderr, False
    except subprocess.TimeoutExpired as e:
        return 124, e.stdout or "", e.stderr or "", True

def list_generated_test_files() -> list[Path]:
    if not GEN_DIR.exists():
        return []
    return sorted([p for p in GEN_DIR.glob("*.py") if p.is_file()])

def rel_to_proj(p: Path) -> str:
    try:
        return str(p.resolve().relative_to(PROJ))
    except Exception:
        return str(p.resolve())

# ==== 0) 산출물 파일 경로 ====
results_jsonl = ART_DIR / "results.jsonl"
manifest_path = ART_DIR / "manifest.json"
coverage_json_path = ART_DIR / "coverage_gen.json"     # 이번 라운드 통합
coverage_xml_path  = ART_DIR / "coverage_gen.xml"
coverage_base_json = ART_DIR / "coverage_base.json"    # 3-1에서 산출한 기준선

for old in [results_jsonl, coverage_json_path, coverage_xml_path]:
    if old.exists():
        old.unlink()

runs = []
ok = fail = to_cnt = 0

# ==== 1) 기존 tests/ 전체 1회 실행 → 베이스라인 샤드 ====
if TESTS_DIR.exists() and any(TESTS_DIR.glob("test*.py")):
    shard_base = COV_SHARDS_DIR / ".coverage.__baseline_tests__"
    env = ENV_BASE.copy()
    env["PYTHONPATH"] = f"{PROJ}:{env.get('PYTHONPATH','')}"
    env["COVERAGE_FILE"] = str(shard_base)
    env.setdefault("NO_PROXY", "*")

    target = rel_to_proj(TESTS_DIR)  # "tests"
    cmd = f"{PY_EXE} -m coverage run{rc_opt} -m pytest {PYTEST_FLAGS} {shlex.quote(target)}"

    start = time.time()
    ts_start = datetime.now(timezone.utc).isoformat()
    rc, out, err, timed_out = sh(cmd, cwd=PROJ, timeout=TIMEOUT_SEC_BASE, env=env)
    dur = round(time.time() - start, 3)
    ts_end = datetime.now(timezone.utc).isoformat()

    # 로그 보관
    (LOG_DIR / "__baseline_tests__.out.txt").write_text(out, encoding="utf-8")
    (LOG_DIR / "__baseline_tests__.err.txt").write_text(err, encoding="utf-8")

    runs.append({
        "test_file": "__BASELINE_SUITE__",
        "goal_id": None,
        "start_utc": ts_start,
        "end_utc": ts_end,
        "duration_sec": dur,
        "returncode": rc,
        "timed_out": timed_out,
        "stdout_len": len(out),
        "stderr_len": len(err),
        "shard_path": str(shard_base),
        "invoked_path": target,
    })
    if timed_out:
        to_cnt += 1
        print(f"⏱️ TIMEOUT __BASELINE_SUITE__ ({dur}s)")
    elif rc == 0:
        ok += 1
        print(f"✅ PASS   __BASELINE_SUITE__ ({dur}s)")
    else:
        fail += 1
        first_err = (err.strip().splitlines() or [""])[0]
        print(f"❌ FAIL   __BASELINE_SUITE__ (rc={rc}, {dur}s) :: {first_err}")
else:
    print("ℹ️ tests/ 디렉터리 또는 테스트 파일이 없어 베이스라인 개별 실행을 건너뜀.")

# ==== 2) 생성 테스트 파일 개별 격리 실행 ====
test_files = list_generated_test_files()
print(f"🧪 생성 테스트 파일: {len(test_files)}개")
for tf in test_files:
    name = tf.name
    gid = goal_id_from_name(name) or "----"
    shard = COV_SHARDS_DIR / f".coverage.{name}"

    env = ENV_BASE.copy()
    env["PYTHONPATH"] = f"{PROJ}:{env.get('PYTHONPATH','')}"
    env["COVERAGE_FILE"] = str(shard)
    env.setdefault("NO_PROXY", "*")

    target = rel_to_proj(tf)   # e.g. "generated_tests/test_0001_...py"
    cmd = f"{PY_EXE} -m coverage run{rc_opt} -m pytest {PYTEST_FLAGS} {shlex.quote(target)}"

    start = time.time()
    ts_start = datetime.now(timezone.utc).isoformat()
    rc, out, err, timed_out = sh(cmd, cwd=PROJ, timeout=TIMEOUT_SEC_GEN, env=env)
    dur = round(time.time() - start, 3)
    ts_end = datetime.now(timezone.utc).isoformat()

    # 로그 저장
    (LOG_DIR / f"{name}.out.txt").write_text(out, encoding="utf-8")
    (LOG_DIR / f"{name}.err.txt").write_text(err, encoding="utf-8")

    runs.append({
        "test_file": name,
        "goal_id": gid,
        "start_utc": ts_start,
        "end_utc": ts_end,
        "duration_sec": dur,
        "returncode": rc,
        "timed_out": timed_out,
        "stdout_len": len(out),
        "stderr_len": len(err),
        "shard_path": str(shard),
        "invoked_path": target,
    })

    if timed_out:
        to_cnt += 1
        print(f"⏱️ TIMEOUT {name} ({dur}s)")
    elif rc == 0:
        ok += 1
        print(f"✅ PASS   {name} ({dur}s)")
    else:
        fail += 1
        first_err = (err.strip().splitlines() or [""])[0]
        print(f"❌ FAIL   {name} (rc={rc}, {dur}s) :: {first_err}")

# 실행 기록 저장
with results_jsonl.open("w", encoding="utf-8") as f:
    for r in runs:
        f.write(json.dumps(r, ensure_ascii=False) + "\n")

manifest = {
    "generated_at_utc": datetime.now(timezone.utc).isoformat(),
    "project": str(PROJ),
    "run_dir": str(ART_DIR),
    "tests_total": (len(test_files) + (1 if any(x['test_file']=="__BASELINE_SUITE__" for x in runs) else 0)),
    "pass": ok,
    "fail": fail,
    "timeout": to_cnt,
    "logs_dir": str(LOG_DIR),
    "cov_shards_dir": str(COV_SHARDS_DIR),
}
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
print("\n📦 실행 요약:", json.dumps({"pass": ok, "fail": fail, "timeout": to_cnt, "total": manifest['tests_total']}, ensure_ascii=False))

# ==== 3) 커버리지 결합(JSON/XML/HTML) ====
shards = sorted([p for p in COV_SHARDS_DIR.iterdir() if p.name.startswith(".coverage.")])
if not shards:
    print("⚠️ 커버리지 샤드가 없습니다. 테스트가 즉시 실패했을 수 있습니다.")
else:
    subprocess.call(f"coverage erase{rc_opt}", shell=True, cwd=str(PROJ))
    combine_cmd = "coverage combine" + rc_opt + " " + " ".join(shlex.quote(str(p)) for p in shards)
    print("> ", combine_cmd)
    subprocess.call(combine_cmd, shell=True, cwd=str(PROJ))
    subprocess.call(f"coverage json -o {coverage_json_path.name}{rc_opt}", shell=True, cwd=str(PROJ))
    subprocess.call(f"coverage xml  -o {coverage_xml_path.name}{rc_opt}",  shell=True, cwd=str(PROJ))
    subprocess.call(f"coverage html -d {HTML_DIR_GEN.name}{rc_opt}",       shell=True, cwd=str(PROJ))

    # 결과 파일을 run_artifacts에 복사 보관
    src_json = PROJ / coverage_json_path.name
    src_xml  = PROJ / coverage_xml_path.name
    if src_json.exists(): shutil.copy2(src_json, coverage_json_path)
    if src_xml.exists():  shutil.copy2(src_xml,  coverage_xml_path)

    print("✅ 커버리지 결합 완료")
    print(" - JSON :", coverage_json_path)
    print(" - XML  :", coverage_xml_path)
    print(" - HTML :", HTML_DIR_GEN / "index.html")

# ==== 4) 분기 관측/목표 달성률 계산 ====
observed_outcomes_gen = {}
branch_points = full_hit = half_hit = zero_hit = 0

if coverage_xml_path.exists():
    try:
        xml_root = etree.parse(str(coverage_xml_path)).getroot()
        for cls in xml_root.findall(".//class"):
            filename = cls.get("filename") or ""
            if not filename:
                continue
            abs_path = (PROJ / filename).resolve() if not Path(filename).is_absolute() else Path(filename)
            for line in cls.findall("./lines/line"):
                if line.get("branch") != "true":
                    continue
                try:
                    num = int(line.get("number"))
                except Exception:
                    continue
                cond = line.get("condition-coverage")  # "50% (1/2)"
                covered = total = 0
                if cond:
                    m = re.search(r"\((\d+)\s*/\s*(\d+)\)", cond)
                    if m:
                        covered, total = int(m.group(1)), int(m.group(2))
                if total == 0:
                    continue
                observed_outcomes_gen.setdefault(str(abs_path), {})[num] = {
                    "covered": covered, "total": total, "ratio": round(covered/total, 3)
                }
                branch_points += 1
                if covered == 0: zero_hit += 1
                elif covered == total: full_hit += 1
                else: half_hit += 1
    except Exception as e:
        print("⚠️ coverage_xml 파싱 실패:", e)

(ART_DIR / "observed_outcomes_gen.json").write_text(
    json.dumps(observed_outcomes_gen, ensure_ascii=False, indent=2),
    encoding="utf-8"
)

print(f"🧮 분기 관측 요약 → total:{branch_points}, full:{full_hit}, half:{half_hit}, zero:{zero_hit}")

# ==== 5) 목표 달성률(기본) ====
GOALS_FILE = ART_DIR / "goals_ranked.json"
if GOALS_FILE.exists() and coverage_json_path.exists():
    cov_json = json.loads((coverage_json_path).read_text(encoding="utf-8"))
    files_map = cov_json.get("files", {})

    def line_hit(fpath: str, ln: int) -> bool:
        finfo = files_map.get(fpath) or files_map.get(str(Path(fpath).resolve()))
        if not finfo:
            return False
        executed = set(finfo.get("executed_lines", []) or [])
        return ln in executed

    goals = json.loads(GOALS_FILE.read_text(encoding="utf-8"))
    goal_stats = []
    for g in goals:
        f = g["file"]
        abs1 = str((PROJ / f).resolve())
        abs2 = f
        hit = sum(1 for ln in g.get("target_lines", []) if line_hit(abs1, ln) or line_hit(abs2, ln))
        total = len(g.get("target_lines", [])) or 1
        goal_stats.append({"id": g["id"], "hit": hit, "total": total, "rate": round(hit/total, 3)})

    (ART_DIR / "goal_achievements.json").write_text(
        json.dumps(goal_stats, ensure_ascii=False, indent=2),
        encoding="utf-8"
    )
    hit_goals = sum(1 for s in goal_stats if s["hit"] > 0)
    print(f"🎯 목표 달성률: {hit_goals}/{len(goal_stats)} 목표가 ≥1 라인 도달")

# ==== 6) 베이스라인 대비 향상치(delta) 계산 ====
def load_json(p: Path, default=None):
    try:
        return json.loads(p.read_text(encoding="utf-8"))
    except Exception:
        return default

base = load_json(coverage_base_json, {"files": {}}) or {"files": {}}
gen  = load_json(coverage_json_path, {"files": {}}) or {"files": {}}
base_files = base.get("files", {})
gen_files  = gen.get("files", {})

def _sum_len(key, d):
    return sum(len((d.get(f, {}) or {}).get(key, []) or []) for f in d.keys())

base_exec = _sum_len("executed_lines", base_files)
base_miss = _sum_len("missing_lines",  base_files)
gen_exec  = _sum_len("executed_lines", gen_files)
gen_miss  = _sum_len("missing_lines",  gen_files)

delta = {
    "executed_lines_delta": gen_exec - base_exec,
    "missing_lines_delta":  base_miss - gen_miss,   # +면 미싱 감소
    "base_executed": base_exec,
    "gen_executed":  gen_exec,
    "base_missing":  base_miss,
    "gen_missing":   gen_miss,
}
(ART_DIR / "coverage_delta.json").write_text(json.dumps(delta, ensure_ascii=False, indent=2), encoding="utf-8")
print("📈 베이스라인 대비 향상치:", json.dumps(delta, ensure_ascii=False))

print("✅ 3-5 완료: (기존+생성) 격리 실행/샤드 결합/분기·목표·향상치 산출")


✅ PASS   __BASELINE_SUITE__ (14.487s)
🧪 생성 테스트 파일: 18개
❌ FAIL   test_0001_test_gen_0001_banking_service_deposit_py_1.py (rc=2, 12.028s) :: 
❌ FAIL   test_0001_test_gen_0001_banking_service_deposit_py_2.py (rc=2, 11.899s) :: 
❌ FAIL   test_0002_test_gen_0002_run_worker_module_py_1.py (rc=1, 14.684s) :: 
❌ FAIL   test_0002_test_gen_0002_run_worker_module_py_2.py (rc=1, 13.682s) :: 
✅ PASS   test_0003_test_gen_0003_run_workflow_module_py_1.py (13.209s)
✅ PASS   test_0003_test_gen_0003_run_workflow_module_py_2.py (13.079s)
❌ FAIL   test_0004_test_gen_0004_activities_refund_py_1.py (rc=1, 12.762s) :: 
❌ FAIL   test_0004_test_gen_0004_activities_refund_py_2.py (rc=1, 12.69s) :: 
❌ FAIL   test_0005_test_gen_0005_run_workflow_main_py_1.py (rc=1, 13.194s) :: 
❌ FAIL   test_0005_test_gen_0005_run_workflow_main_py_2.py (rc=1, 13.302s) :: 
✅ PASS   test_0007_test_gen_0007_activities_deposit_py_1.py (12.812s)
✅ PASS   test_0007_test_gen_0007_activities_deposit_py_2.py (12.81s)
❌ FAIL   test_0008_te

In [20]:
  #@title 3-6) 테스트 보완 – Refinement-based Adaptive Round (coverage_gen 기반 미도달·분포 주입)
import os, sys, json, re
from pathlib import Path

# ==== 경로/상수 ====
assert 'PROJ' in globals(), "3-0 단계를 먼저 실행하세요."
PROJ = Path(PROJ).resolve()
ART_DIR = PROJ / "run_artifacts" / "run1"
GEN_DIR = PROJ / "generated_tests"
LOG_DIR = ART_DIR / "logs"
HTML_GEN_DIR = PROJ / "htmlcov_gen"

COV_BASE_JSON = ART_DIR / "coverage_base.json"       # 3-1 기준선 (참고용)
COV_GEN_JSON  = ART_DIR / "coverage_gen.json"        # 3-5/3-8 통합 커버리지(필수)
GOALS_FILE    = ART_DIR / "goals_ranked.json"        # 3-2 목표
UNCV_MAP_JSON = ART_DIR / "uncovered_map_base.json"  # 3-1 미커버 라인(참고용)
RESULTS_JL    = ART_DIR / "results.jsonl"            # 3-5/3-8 실행 결과 로그 인덱스(참고용)

# 선별/배치 파라미터
NEAR_MISS_WINDOW = 2
BATCH_SIZE = 3
MAX_ROUNDS = 5
TOPK_FILES = 10   # 전역 미커버 상위 파일 요약 개수

# ==== 라운드 디렉터리 자동 증가 ====
def next_round_dir(base: Path) -> Path:
    i = 1
    while True:
        cand = base / f"refine_round{i}"
        if not cand.exists():
            cand.mkdir(parents=True, exist_ok=True)
            return cand
        i += 1

REFINE_DIR = next_round_dir(ART_DIR)
REFINE_PROMPTS = REFINE_DIR / "llm_refine_prompts.jsonl"
REFINE_SUMMARY = REFINE_DIR / "refine_selection.json"
REFINE_TEST_EXPORT = REFINE_DIR / "selected_tests_dump.json"

# ==== 유틸 ====
def load_json(p: Path, default=None):
    try:
        return json.loads(p.read_text(encoding="utf-8"))
    except Exception:
        return default

def read_text_safe(p: Path) -> str:
    try:
        return p.read_text(encoding="utf-8")
    except Exception:
        return ""

def first_lines(s: str, n=2000):
    if not s:
        return ""
    head = s[:n]
    if len(s) > n:
        head += "\n...<truncated>..."
    return head

RE_GID = re.compile(r"(?:^|[_-])(?P<gid>\d{4})(?:[_-]|$)")
def goal_id_from_name(name: str) -> str | None:
    m = RE_GID.search(name)
    return m.group("gid") if m else None

def list_generated_tests_for_gid(gid: str) -> list[Path]:
    """goal id에 해당하는 생성/보강 테스트 후보들을 최신순으로 반환."""
    if not GEN_DIR.exists():
        return []
    cands = [p for p in GEN_DIR.glob("*.py") if gid in p.name]
    # 보강본(*_rN.py)을 우선, 숫자 큰 것 우선 → 없으면 기본 파일
    def rank(p: Path):
        m = re.search(r"_r(\d+)\.py$", p.name)
        r = int(m.group(1)) if m else -1
        return (0 if r >= 0 else 1, -r, -p.stat().st_mtime)
    return sorted(cands, key=rank)

def pick_latest_test_for_gid(gid: str) -> Path | None:
    lst = list_generated_tests_for_gid(gid)
    return lst[0] if lst else None

# ==== 데이터 로드 ====
cov_base = load_json(COV_BASE_JSON, {"files": {}}) or {"files": {}}
cov_gen  = load_json(COV_GEN_JSON,  {"files": {}}) or {"files": {}}
goals    = load_json(GOALS_FILE,    []) or []
uncovered_map = load_json(UNCV_MAP_JSON, {}) or {}

if not cov_gen or not goals:
    raise SystemExit("필수 산출물(coverage_gen.json 또는 goals_ranked.json)이 없습니다. 3-2, 3-5/3-8 후 실행하세요.")

gen_files = cov_gen.get("files", {}) or {}

def _info_for(file_rel: str):
    """coverage_gen.json에서 상대/절대 키 모두 탐색."""
    return gen_files.get(file_rel) or gen_files.get(str((PROJ / file_rel).resolve()))

def line_hit_in_gen(file_rel: str, ln: int) -> bool:
    info = _info_for(file_rel)
    if not info:
        return False
    return ln in set(info.get("executed_lines", []) or [])

def executed_set_in_gen(file_rel: str) -> set[int]:
    info = _info_for(file_rel)
    return set(info.get("executed_lines", []) or []) if info else set()

def missing_set_in_gen(file_rel: str) -> set[int]:
    info = _info_for(file_rel)
    return set(info.get("missing_lines", []) or []) if info else set()

# ==== A) coverage_gen.json에서 전역 미커버 라인 분포 추출 ====
# 파일별 미커버 라인 집합 및 총합 집계
global_uncovered_map = {}
total_missing = 0
for fkey, finfo in gen_files.items():
    miss = sorted(set((finfo or {}).get("missing_lines", []) or []))
    if miss:
        global_uncovered_map[fkey] = miss
        total_missing += len(miss)

# 전역 요약(상위 미커버 파일 TOPK)
global_uncovered_summary = []
for fkey, miss in sorted(global_uncovered_map.items(), key=lambda kv: len(kv[1]), reverse=True)[:TOPK_FILES]:
    global_uncovered_summary.append({
        "file": fkey,
        "missing_count": len(miss),
        "missing_lines": miss[:200],  # 너무 길면 잘라서 힌트만
    })

# ==== 1) 미도달 목표 판정 (coverage_gen 기준) ====
miss_goals = []  # 보강 대상 후보
for g in goals:
    file_rel = g["file"]
    tlines = g.get("target_lines", []) or []
    hits = sum(1 for ln in tlines if line_hit_in_gen(file_rel, ln))
    if hits == 0 and tlines:  # 목표 라인 ≥1줄도 못 맞춘 경우만 보강
        # near-miss 판단(통합 실행 기준)
        exed = executed_set_in_gen(file_rel)
        neigh = set()
        for t in tlines:
            for k in range(-NEAR_MISS_WINDOW, NEAR_MISS_WINDOW + 1):
                neigh.add(t + k)
        near = len(exed & neigh) > 0
        miss_goals.append({
            "goal_id": g["id"],
            "file": file_rel,
            "function": g.get("function", {}),
            "target_lines": tlines,
            "near_miss": near
        })

# ==== 2) 각 미도달 goal → 최신 테스트 파일 매핑 & 프롬프트 입력 구성 ====
selected = []
for item in miss_goals:
    gid = item["goal_id"]
    tfile = pick_latest_test_for_gid(gid)
    if not tfile:
        # 생성된 테스트가 없으면 스킵(다음 라운드에 새로 생성)
        continue

    original_code = read_text_safe(tfile)
    out_log = read_text_safe(LOG_DIR / f"{tfile.name}.out.txt")
    err_log = read_text_safe(LOG_DIR / f"{tfile.name}.err.txt")

    # 해당 목표에서 아직 미도달한 타겟 라인(coverage_gen 기준)
    still_missing = [ln for ln in item["target_lines"] if not line_hit_in_gen(item["file"], ln)]

    # 현재 라운드 기준 그 파일의 전체 미커버 라인(coverage_gen 기준)
    file_uncovered_remaining_gen = sorted(missing_set_in_gen(item["file"]))

    # baseline의 파일별 미커버 라인(참고용)
    file_abs = str((PROJ / item["file"]).resolve())
    base_uncovered = uncovered_map.get(file_abs, uncovered_map.get(item["file"], [])) or []

    selected.append({
        "id": f"{gid}::{tfile.name}",
        "goal_id": gid,
        "target_file": item["file"],
        "target_lines": item["target_lines"],
        "near_miss": item["near_miss"],
        "uncovered_diff": {
            "still_missing_target_lines": sorted(still_missing),
            "file_uncovered_lines_baseline": sorted(set(int(x) for x in base_uncovered)),
            "file_uncovered_remaining_gen": file_uncovered_remaining_gen,   # ★ 현재 미커버(해당 파일)
        },
        "run": {
            "stdout_head": first_lines(out_log, 1500),
            "stderr_head": first_lines(err_log, 1500),
        },
        "original_test_code": original_code,
    })

# 선별 없으면 종료
if not selected:
    REFINE_PROMPTS.write_text("", encoding="utf-8")
    REFINE_SUMMARY.write_text(json.dumps({
        "round_dir": REFINE_DIR.name,
        "using_coverage_gen_only": True,
        "selected": 0,
        "reason": "모든 목표가 최소 1줄 이상 도달했거나, 해당 goal id의 테스트 파일이 없음",
        "global_uncovered": {
            "total_missing_lines": total_missing,
            "top_files": global_uncovered_summary
        }
    }, ensure_ascii=False, indent=2), encoding="utf-8")
    print("ℹ️ 미도달 목표가 없거나, 매칭 테스트가 없습니다. 보강 프롬프트를 만들지 않았습니다.")
    raise SystemExit(0)

# ==== 3) 파일별 충돌 피하기(동일 파일 목표는 한 배치에 하나만) → 배치 구성 ====
batches, bucket, seen_files = [], [], set()
for rec in selected:
    f = rec["target_file"]
    if f in seen_files or len(bucket) >= BATCH_SIZE:
        if bucket:
            batches.append(bucket)
        bucket, seen_files = [], set()
    bucket.append(rec)
    seen_files.add(f)
if bucket:
    batches.append(bucket)

# ==== 4) LLM 보강 프롬프트(JSONL) 생성 ====
SYSTEM_REFINE = (
    "당신은 기존 pytest 테스트를 보강하여 미커버 영역(Target Lines)에 도달하도록 수정하는 전문가입니다.\n"
    "출력은 마크다운 없이 **순수 JSON 객체** 하나로만 응답합니다. 스키마는 아래와 같습니다:\n"
    "{\n"
    '  "edits": [\n'
    '    {"id": "<goal_id::filename>", "new_code": "<보강된 pytest 테스트 파일 전체 문자열>"}\n'
    "  ]\n"
    "}\n"
    "지침:\n"
    "• 테스트의 구조를 유지하되, 입력/경계조건/호출 순서/예외 트리거를 조정해 `still_missing_target_lines`에 실제로 도달하게 하세요.\n"
    "• 외부 부작용 금지(파일/네트워크/시간/환경/Temporal). 필요한 경우 monkeypatch/더미를 사용하세요.\n"
    "• import는 importlib + getattr 경로를 유지하고, 속성이 없을 때만 가드형 조건으로 pytest.skip을 허용합니다.\n"
    "• 각 테스트는 최소 1줄 이상의 target_lines를 실제 실행해야 하며, 관련된 assert 또는 pytest.raises를 포함해야 합니다.\n"
    "• 테스트 이름에 타격 라인을 `hits_L<line>` 형태로 포함하는 것을 권장합니다.\n"
    "• 출력에는 코드 외 설명/주석/마크다운을 포함하지 마세요. **JSON만** 반환하세요.\n"
)

with REFINE_PROMPTS.open("w", encoding="utf-8") as outf:
    for i, batch in enumerate(batches, start=1):
        user_payload = {
            "schema_version": "refine-v1",
            "round_dir": REFINE_DIR.name,
            "selection_params": {
                "near_miss_window": NEAR_MISS_WINDOW,
                "batch_size": BATCH_SIZE,
                "using_coverage_gen_only": True,
                "max_rounds": MAX_ROUNDS
            },
            # ★ 전역 미커버 요약(모델이 우선순위 고려하도록 힌트)
            "global_uncovered": {
                "total_missing_lines": total_missing,
                "top_files": global_uncovered_summary
            },
            "batch_index": i,
            "tests": batch,
        }
        record = {
            "meta": {
                "batch_index": i,
                "num_tests": len(batch),
                "ids": [t["id"] for t in batch],
                "round_dir": REFINE_DIR.name,
                "using_coverage_gen_only": True,
            },
            "messages": [
                {"role": "system", "content": SYSTEM_REFINE},
                {"role": "user", "content": json.dumps(user_payload, ensure_ascii=False, indent=2)},
            ],
        }
        outf.write(json.dumps(record, ensure_ascii=False) + "\n")

# ==== 5) 요약/덤프 ====
summary = {
    "round_dir": REFINE_DIR.name,
    "using_coverage_gen_only": True,
    "params": {
        "near_miss_window": NEAR_MISS_WINDOW,
        "batch_size": BATCH_SIZE
    },
    "selected": len(selected),
    "batches": [{"batch_index": i+1, "num_tests": len(b)} for i, b in enumerate(batches)],
    "global_uncovered": {
        "total_missing_lines": total_missing,
        "top_files": global_uncovered_summary
    }
}
REFINE_SUMMARY.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
REFINE_TEST_EXPORT.write_text(json.dumps(selected, ensure_ascii=False, indent=2), encoding="utf-8")

print("✅ 보강 대상 선별 완료 (coverage_gen 기반 + 전역 분포 주입)")
print(" - 라운드 폴더:", REFINE_DIR)
print(" - 선별된 테스트 수:", len(selected))
print(" - 배치 수:", len(batches))
print(" - 프롬프트 JSONL:", REFINE_PROMPTS)
print(" - 선별 요약:", REFINE_SUMMARY)


✅ 보강 대상 선별 완료 (coverage_gen 기반 + 전역 분포 주입)
 - 라운드 폴더: /content/money-transfer-project-template-python/run_artifacts/run1/refine_round5
 - 선별된 테스트 수: 8
 - 배치 수: 4
 - 프롬프트 JSONL: /content/money-transfer-project-template-python/run_artifacts/run1/refine_round5/llm_refine_prompts.jsonl
 - 선별 요약: /content/money-transfer-project-template-python/run_artifacts/run1/refine_round5/refine_selection.json


In [21]:
#@title 3-7) 보강 적용기 – LLM Edits 수신 → (교체형) 원본 백업 후 in-place 교체 + 활성 매니페스트 갱신
import os, re, json, ast, shutil, time
from pathlib import Path
import httpx
import backoff
from openai import OpenAI, APIError, RateLimitError, APIConnectionError

# ==== 설정 ====
REPLACE_IN_PLACE = True   # ✅ 교체형 모드 (False면 _rN 파일로 공존 저장)
MODEL = "gpt-4o"

# ==== 경로/상수 ====
assert 'PROJ' in globals(), "3-0 단계를 먼저 실행하세요."
PROJ = Path(PROJ).resolve()
ART_DIR = PROJ / "run_artifacts" / "run1"
GEN_DIR = PROJ / "generated_tests"

# 최신 refine_roundN 탐색
refine_rounds = sorted([p for p in ART_DIR.iterdir() if p.is_dir() and p.name.startswith("refine_round")])
if not refine_rounds:
    raise SystemExit("refine_roundN 폴더가 없습니다. 3-6을 먼저 실행하세요.")
REFINE_DIR = refine_rounds[-1]
PROMPTS_PATH = REFINE_DIR / "llm_refine_prompts.jsonl"

if not PROMPTS_PATH.exists():
    raise SystemExit(f"프롬프트 파일이 없습니다: {PROMPTS_PATH}")

RAW_DIR = REFINE_DIR / "_raw_edits"
ERR_DIR = REFINE_DIR / "_errors"
ARCHIVE_DIR = REFINE_DIR / "_archive"
LOG_PATH = REFINE_DIR / "apply_log.jsonl"
MANIFEST = GEN_DIR / "ACTIVE_MANIFEST.json"

RAW_DIR.mkdir(parents=True, exist_ok=True)
ERR_DIR.mkdir(parents=True, exist_ok=True)
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
GEN_DIR.mkdir(parents=True, exist_ok=True)

# ==== OpenAI 클라이언트 ====
if not os.getenv("OPENAI_API_KEY"):
    raise RuntimeError("OPENAI_API_KEY가 설정되지 않았습니다. 3-0 단계에서 .env를 로드했는지 확인하세요.")
http_client = httpx.Client(
    timeout=180.0, follow_redirects=True,
    limits=httpx.Limits(max_connections=1, max_keepalive_connections=0),
    transport=httpx.HTTPTransport(retries=5),
)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url="https://api.openai.com/v1", http_client=http_client)

# ==== 유틸 ====
def strip_fences(s: str) -> str:
    import re
    s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", (s or "").strip())
    s = re.sub(r"\s*```$", "", s)
    return s

def write_error(tag: str, payload: dict):
    (ERR_DIR / f"{tag}.json").write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")

def ensure_unique_path(base: Path) -> Path:
    p = base; i = 2
    while p.exists():
        p = base.with_name(f"{base.stem}_{i}{base.suffix}"); i += 1
    return p

def now_ts():
    return time.strftime("%Y%m%d_%H%M%S")

# ---- 검증 (모듈 무조건 skip 금지, 가드형만 허용) ----
import re as _re, ast
RE_IMPORTLIB = _re.compile(r"\bimportlib\.import_module\s*\(")
RE_PYTEST_RAISES = _re.compile(r"\bpytest\.raises\s*\(")

def parse_ast_or_error(code: str):
    try: return ast.parse(code), None
    except SyntaxError as e: return None, f"syntax_error:{e.msg}@L{e.lineno}"

def extract_test_funcs(tree: ast.AST) -> list[ast.FunctionDef]:
    return [n for n in ast.walk(tree) if isinstance(n, ast.FunctionDef) and n.name.startswith("test_")]

def has_assert_or_raises(tree: ast.AST, code: str) -> bool:
    return any(isinstance(n, ast.Assert) for n in ast.walk(tree)) or bool(RE_PYTEST_RAISES.search(code))

def uses_importlib(code: str) -> bool:
    return bool(RE_IMPORTLIB.search(code))

def _parent_map(tree: ast.AST):
    parent = {}
    for node in ast.walk(tree):
        for child in ast.iter_child_nodes(node):
            parent[child] = node
    return parent

def _is_guarded_skip(call: ast.Call, parent_map) -> bool:
    cur = call
    while cur in parent_map:
        cur = parent_map[cur]
        if isinstance(cur, ast.If):
            test = cur.test
            if (isinstance(test, ast.Compare) and len(test.ops)==1 and isinstance(test.ops[0], ast.Is)
                and len(test.comparators)==1 and isinstance(test.comparators[0], ast.Constant)
                and test.comparators[0].value is None):
                return True
        if isinstance(cur, ast.ExceptHandler):
            t = cur.type
            if isinstance(t, ast.Name) and t.id in {"ImportError","NameError"}: return True
            if isinstance(t, ast.Tuple) and any(isinstance(e, ast.Name) and e.id in {"ImportError","NameError"} for e in t.elts):
                return True
    return False

def has_unconditional_skip(code: str) -> tuple[bool, list[int]]:
    try:
        tree = ast.parse(code)
    except SyntaxError:
        return (False, [])
    parent = _parent_map(tree); bad = []
    for node in ast.walk(tree):
        if isinstance(node, ast.Call):
            f = node.func
            if isinstance(f, ast.Attribute) and isinstance(f.value, ast.Name) and f.value.id=="pytest" and f.attr=="skip":
                if not _is_guarded_skip(node, parent): bad.append(getattr(node, "lineno", -1))
    return (len(bad) > 0), bad

def minimal_viability_checks(code: str) -> tuple[bool, list[str], dict]:
    reasons = []; meta = {"warnings": []}
    if len((code or "").strip()) < 60:
        reasons.append("too_short")
    tree, synerr = parse_ast_or_error(code)
    if synerr: reasons.append(synerr); return False, reasons, meta
    tests = extract_test_funcs(tree)
    if not tests: reasons.append("no_test_functions")
    if not has_assert_or_raises(tree, code): reasons.append("no_assert_or_raises")
    if not uses_importlib(code): reasons.append("no_importlib_import_module")
    if tests and not any("hits_L" in t.name for t in tests):
        meta["warnings"].append("missing_hits_L_in_test_name")
    bad_skip, lines = has_unconditional_skip(code)
    if bad_skip: reasons.append(f"unconditional_skip_detected@{lines}")
    return (len(reasons) == 0), reasons, meta

# ==== OpenAI 호출 ====
@backoff.on_exception(backoff.expo, (APIConnectionError, APIError, RateLimitError), max_tries=8, max_time=300)
def call_openai_with_retry(messages):
    resp = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        response_format={"type": "json_object"},
        timeout=180.0,
    )
    return resp.choices[0].message.content

# ==== 활성 매니페스트 ====
def load_manifest():
    try:
        return json.loads(MANIFEST.read_text(encoding="utf-8"))
    except Exception:
        return {"active": {}, "history": {}}
def save_manifest(m):
    MANIFEST.write_text(json.dumps(m, ensure_ascii=False, indent=2), encoding="utf-8")

manifest = load_manifest()

# ==== 메인 루프 ====
ok_batches = ok_edits = fail_batches = fail_edits = 0

with PROMPTS_PATH.open("r", encoding="utf-8") as f_in, open(LOG_PATH, "w", encoding="utf-8") as f_log:
    for line in f_in:
        rec = json.loads(line)
        meta = rec.get("meta", {})
        messages = rec.get("messages", [])
        bidx = meta.get("batch_index")

        print(f"\n🚀 Refinement Batch #{bidx} 요청…")
        try:
            out_text = call_openai_with_retry(messages)
        except Exception as e:
            fail_batches += 1
            write_error(f"batch_{bidx}_request_error", {"error": str(e)})
            print(f"❌ 배치 {bidx} 요청 실패: {e}")
            continue

        # 원문 저장
        (RAW_DIR / f"batch_{bidx}_raw.json").write_text(out_text, encoding="utf-8")

        # JSON 파싱
        try:
            cleaned = strip_fences(out_text)
            result = json.loads(cleaned)
        except Exception as e:
            fail_batches += 1
            write_error(f"batch_{bidx}_json_error", {"error": str(e), "raw_head": out_text[:2000]})
            print(f"❌ 배치 {bidx} JSON 파싱 실패: {e}")
            continue

        edits = result.get("edits") or []
        if not isinstance(edits, list) or not edits:
            fail_batches += 1
            write_error(f"batch_{bidx}_schema_error", {"result": result})
            print(f"❌ 배치 {bidx} 스키마 오류: edits 비어있음")
            continue

        saved = []
        for eidx, e in enumerate(edits, start=1):
            eid = e.get("id", "")
            code = strip_fences(e.get("new_code", ""))

            if "::" not in eid:
                fail_edits += 1
                write_error(f"batch_{bidx}_edit_{eidx}_bad_id", {"id": eid})
                print(f"⚠️ 편집 #{eidx} 제외: 잘못된 id 형식")
                continue
            gid, orig_name = eid.split("::", 1)
            orig_name = Path(orig_name).name

            ok_min, reasons, meta_w = minimal_viability_checks(code)
            if not ok_min:
                fail_edits += 1
                write_error(f"batch_{bidx}_edit_{eidx}_min_viability", {"id": eid, "reasons": reasons, **meta_w})
                print(f"⚠️ 편집 #{eidx} 제외: {reasons}")
                continue

            # === 저장 정책 ===
            round_tag = REFINE_DIR.name.rsplit("refine_round", 1)[-1]
            stem = Path(orig_name).stem
            suffix = Path(orig_name).suffix or ".py"

            if REPLACE_IN_PLACE:
                # 1) 기존 파일 백업
                dst_path = GEN_DIR / orig_name
                if dst_path.exists():
                    backup = ARCHIVE_DIR / f"{now_ts()}__{orig_name}"
                    shutil.copy2(dst_path, backup)
                    # history 기록
                    manifest.setdefault("history", {}).setdefault(gid, []).append(str(backup))

                # 2) 원래 경로에 새 코드로 교체
                dst_path.write_text(code, encoding="utf-8")
                saved.append(dst_path.name)

                # 3) active 매핑 갱신
                manifest.setdefault("active", {})[gid] = dst_path.name
            else:
                # 공존 저장(_rN)
                out_name = f"{stem}_r{round_tag}{suffix}"
                out_path = ensure_unique_path(GEN_DIR / out_name)
                out_path.write_text(code, encoding="utf-8")
                saved.append(out_path.name)
                # active는 최신본으로 포인터만 갱신
                manifest.setdefault("active", {})[gid] = out_path.name
                manifest.setdefault("history", {}).setdefault(gid, []).append(str(out_path))

            ok_edits += 1
            print(f"✅ 적용: {manifest['active'][gid]} (warnings: {','.join(meta_w.get('warnings', [])) or '없음'})")

        if saved:
            ok_batches += 1
            save_manifest(manifest)
            f_log.write(json.dumps({"batch_index": bidx, "saved_files": saved}, ensure_ascii=False) + "\n")
        else:
            fail_batches += 1
            write_error(f"batch_{bidx}_no_valid_edits", {"note": "모든 edits가 검증에서 제외됨"})

print(f"\n✅ 보강 적용 완료: 배치 성공 {ok_batches} / 실패 {fail_batches} | 편집 성공 {ok_edits} / 실패 {fail_edits}")
print(f"   • 라운드 폴더 : {REFINE_DIR}")
print(f"   • 원문       : {RAW_DIR}")
print(f"   • 에러       : {ERR_DIR}")
print(f"   • 로그       : {LOG_PATH}")
print(f"   • 활성 매니페스트 : {MANIFEST}")
print("이제 3-8을 실행해 커버리지/목표 달성률 향상을 평가하세요.")



🚀 Refinement Batch #1 요청…

🚀 Refinement Batch #2 요청…

🚀 Refinement Batch #3 요청…

🚀 Refinement Batch #4 요청…

✅ 보강 적용 완료: 배치 성공 4 / 실패 0 | 편집 성공 8 / 실패 0
   • 라운드 폴더 : /content/money-transfer-project-template-python/run_artifacts/run1/refine_round5
   • 원문       : /content/money-transfer-project-template-python/run_artifacts/run1/refine_round5/_raw_edits
   • 에러       : /content/money-transfer-project-template-python/run_artifacts/run1/refine_round5/_errors
   • 로그       : /content/money-transfer-project-template-python/run_artifacts/run1/refine_round5/apply_log.jsonl
   • 활성 매니페스트 : /content/money-transfer-project-template-python/generated_tests/ACTIVE_MANIFEST.json
이제 3-8을 실행해 커버리지/목표 달성률 향상을 평가하세요.


In [22]:
#@title 3-8) 기존 tests + 생성/보강 tests 격리 실행 · 로그 수집 · 샤드 결합 · 향상치 계산 (ACTIVE_MANIFEST 우선)
import os, sys, json, re, time, subprocess, shutil, shlex
from pathlib import Path
from datetime import datetime, timezone
from lxml import etree

# ==== 경로/상수 ====
assert 'PROJ' in globals(), "3-0 단계를 먼저 실행하세요."
PROJ = Path(PROJ).resolve()
ART_DIR = PROJ / "run_artifacts" / "run1"
GEN_DIR = PROJ / "generated_tests"
TESTS_DIR = PROJ / "tests"
LOG_DIR = ART_DIR / "logs"
COV_SHARDS_DIR = ART_DIR / "cov_shards"
HTML_DIR_GEN = PROJ / "htmlcov_gen"
ACTIVE_MANIFEST = GEN_DIR / "ACTIVE_MANIFEST.json"

ART_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
COV_SHARDS_DIR.mkdir(parents=True, exist_ok=True)
HTML_DIR_GEN.mkdir(parents=True, exist_ok=True)

RCFILE = PROJ / ".coveragerc"
rc_opt = f" --rcfile {RCFILE}" if RCFILE.exists() else ""

# 실행 파라미터
PY_EXE = sys.executable
TIMEOUT_SEC_GEN = 30
TIMEOUT_SEC_BASE = 120
PYTEST_FLAGS = "-q -s"
ENV_BASE = os.environ.copy()
ENV_BASE.setdefault("NO_PROXY", "*")
ENV_BASE.setdefault("PYTHONHASHSEED", "0")

# 선택 정책 플래그(매니페스트 없을 때만 사용)
PREFER_LATEST_REFINED = True
KEEP_ORIGINAL_ALONGSIDE = False

# goal_id / refine suffix
RE_GOAL = re.compile(r"(?:^|[_-])(?P<gid>\d{4})(?:[_-]|$)")
RE_REFINE_SUFFIX = re.compile(r"_r(\d+)\.py$")

def goal_id_from_name(name: str) -> str | None:
    m = RE_GOAL.search(name); return m.group("gid") if m else None

def sh(cmd: str, cwd: Path|None=None, timeout: int|None=None, env: dict|None=None):
    try:
        p = subprocess.run(cmd, cwd=str(cwd or PROJ), env=env or ENV_BASE,
                           shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                           timeout=timeout, text=True)
        return p.returncode, p.stdout, p.stderr, False
    except subprocess.TimeoutExpired as e:
        return 124, e.stdout or "", e.stderr or "", True

def rel_to_proj(p: Path) -> str:
    try: return str(p.resolve().relative_to(PROJ))
    except Exception: return str(p.resolve())

def latest_refine_round() -> int|None:
    rounds = []
    for d in ART_DIR.iterdir():
        if d.is_dir() and d.name.startswith("refine_round"):
            try: rounds.append(int(d.name.replace("refine_round","")))
            except: pass
    return max(rounds) if rounds else None

def list_from_active_manifest() -> list[Path]:
    try:
        m = json.loads(ACTIVE_MANIFEST.read_text(encoding="utf-8"))
        active = m.get("active", {})
        files = [GEN_DIR / fname for fname in active.values()]
        files = [p for p in files if p.exists() and p.suffix==".py"]
        if files:
            print(f"📒 ACTIVE_MANIFEST 사용: {len(files)}개 활성 테스트 실행")
        return sorted(files)
    except Exception:
        return []

def list_generated_test_files_selected() -> list[Path]:
    # 매니페스트가 있으면 그걸 우선 사용
    from_manifest = list_from_active_manifest()
    if from_manifest:
        return from_manifest

    # 없으면: 최신 refine 라운드의 *_rN.py만 선택(없으면 원본)
    if not GEN_DIR.exists(): return []
    files = sorted([p for p in GEN_DIR.glob("*.py") if p.is_file()])
    if not files: return []

    latest = latest_refine_round()
    def base_stem(p: Path):
        m = RE_REFINE_SUFFIX.search(p.name)
        return p.name[:m.start()] if m else p.stem

    selected, by_stem = [], {}
    for f in files:
        by_stem.setdefault(base_stem(f), []).append(f)

    refined_count = original_kept = original_dropped = 0
    for stem, group in by_stem.items():
        best_refined, best_round, originals = None, -1, []
        for f in group:
            m = RE_REFINE_SUFFIX.search(f.name)
            if m:
                r = int(m.group(1))
                if latest is None or r == latest:
                    if r > best_round:
                        best_round, best_refined = r, f
            else:
                originals.append(f)
        if PREFER_LATEST_REFINED and best_refined is not None:
            selected.append(best_refined); refined_count += 1
            if KEEP_ORIGINAL_ALONGSIDE:
                selected.extend(originals); original_kept += len(originals)
            else:
                original_dropped += len(originals)
        else:
            selected.extend(group)
    print(f"📌 선택 요약: 보강본 {refined_count}개 선택"
          + (f", 원본 추가 {original_kept}개" if KEEP_ORIGINAL_ALONGSIDE else f", 원본 제외 {original_dropped}개"))
    return sorted(selected)

# ==== 0) 산출물 경로 ====
results_jsonl = ART_DIR / "results.jsonl"
manifest_path = ART_DIR / "manifest.json"
coverage_json_path = ART_DIR / "coverage_gen.json"
coverage_xml_path  = ART_DIR / "coverage_gen.xml"

for old in [results_jsonl, coverage_json_path, coverage_xml_path]:
    if old.exists():
        old.unlink()

runs = []; ok=fail=to_cnt=0

# ==== 1) 기존 tests/ 한 번 실행 → 베이스라인 샤드 ====
if TESTS_DIR.exists() and any(TESTS_DIR.glob("test*.py")):
    shard_base = COV_SHARDS_DIR / ".coverage.__baseline_tests__"
    env = ENV_BASE.copy()
    env["PYTHONPATH"] = f"{PROJ}:{env.get('PYTHONPATH','')}"
    env["COVERAGE_FILE"] = str(shard_base)
    target = rel_to_proj(TESTS_DIR)
    cmd = f"{PY_EXE} -m coverage run{rc_opt} -m pytest {PYTEST_FLAGS} {shlex.quote(target)}"
    start = time.time()
    ts_start = datetime.now(timezone.utc).isoformat()
    rc, out, err, timed_out = sh(cmd, cwd=PROJ, timeout=TIMEOUT_SEC_BASE, env=env)
    dur = round(time.time()-start, 3)
    ts_end = datetime.now(timezone.utc).isoformat()
    (LOG_DIR / "__baseline_tests__.out.txt").write_text(out, encoding="utf-8")
    (LOG_DIR / "__baseline_tests__.err.txt").write_text(err, encoding="utf-8")
    runs.append({"test_file":"__BASELINE_SUITE__","goal_id":None,
                 "start_utc":ts_start,"end_utc":ts_end,"duration_sec":dur,
                 "returncode":rc,"timed_out":timed_out,
                 "stdout_len":len(out),"stderr_len":len(err),
                 "shard_path":str(shard_base),"invoked_path":target})
    if timed_out: to_cnt+=1; print(f"⏱️ TIMEOUT __BASELINE_SUITE__ ({dur}s)")
    elif rc==0: ok+=1; print(f"✅ PASS   __BASELINE_SUITE__ ({dur}s)")
    else:
        fail+=1; first_err=(err.strip().splitlines() or [""])[0]
        print(f"❌ FAIL   __BASELINE_SUITE__ (rc={rc}, {dur}s) :: {first_err}")
else:
    print("ℹ️ tests/ 폴더 또는 파일이 없어 베이스라인 개별 실행 건너뜀.")

# ==== 2) 생성/보강 테스트 개별 격리 실행 ====
test_files = list_generated_test_files_selected()
print(f"🧪 생성/보강 테스트 파일(실행 대상): {len(test_files)}개")
if not test_files:
    print("⚠️ 실행할 생성/보강 테스트가 없습니다.")

for tf in test_files:
    name = tf.name
    gid = goal_id_from_name(name) or "----"
    shard = COV_SHARDS_DIR / f".coverage.{name}"

    env = ENV_BASE.copy()
    env["PYTHONPATH"] = f"{PROJ}:{env.get('PYTHONPATH','')}"
    env["COVERAGE_FILE"] = str(shard)

    target = rel_to_proj(tf)
    cmd = f"{PY_EXE} -m coverage run{rc_opt} -m pytest {PYTEST_FLAGS} {shlex.quote(target)}"
    start = time.time()
    ts_start = datetime.now(timezone.utc).isoformat()
    rc, out, err, timed_out = sh(cmd, cwd=PROJ, timeout=TIMEOUT_SEC_GEN, env=env)
    dur = round(time.time()-start, 3)
    ts_end = datetime.now(timezone.utc).isoformat()

    (LOG_DIR / f"{name}.out.txt").write_text(out, encoding="utf-8")
    (LOG_DIR / f"{name}.err.txt").write_text(err, encoding="utf-8")

    runs.append({"test_file":name,"goal_id":gid,
                 "start_utc":ts_start,"end_utc":ts_end,"duration_sec":dur,
                 "returncode":rc,"timed_out":timed_out,
                 "stdout_len":len(out),"stderr_len":len(err),
                 "shard_path":str(shard),"invoked_path":target})
    if timed_out: to_cnt+=1; print(f"⏱️ TIMEOUT {name} ({dur}s)")
    elif rc==0: ok+=1; print(f"✅ PASS   {name} ({dur}s)")
    else:
        fail+=1; first_err=(err.strip().splitlines() or [""])[0]
        print(f"❌ FAIL   {name} (rc={rc}, {dur}s) :: {first_err}")

with results_jsonl.open("w", encoding="utf-8") as f:
    for r in runs: f.write(json.dumps(r, ensure_ascii=False) + "\n")

manifest = {
    "generated_at_utc": datetime.now(timezone.utc).isoformat(),
    "project": str(PROJ),
    "run_dir": str(ART_DIR),
    "tests_total": (len(test_files) + (1 if any(x['test_file']=="__BASELINE_SUITE__" for x in runs) else 0)),
    "pass": ok, "fail": fail, "timeout": to_cnt,
    "logs_dir": str(LOG_DIR),
    "cov_shards_dir": str(COV_SHARDS_DIR),
    "active_manifest_used": ACTIVE_MANIFEST.exists()
}
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
print("\n📦 실행 요약:", json.dumps({"pass": ok, "fail": fail, "timeout": to_cnt, "total": manifest['tests_total']}, ensure_ascii=False))

# ==== 3) 커버리지 결합(JSON/XML/HTML) ====
shards = sorted([p for p in COV_SHARDS_DIR.iterdir() if p.name.startswith(".coverage.")])
if not shards:
    print("⚠️ 커버리지 샤드가 없습니다. 테스트가 즉시 실패했을 가능성이 있습니다.")
else:
    subprocess.call(f"coverage erase{rc_opt}", shell=True, cwd=str(PROJ))
    combine_cmd = "coverage combine" + rc_opt + " " + " ".join(shlex.quote(str(p)) for p in shards)
    print("> ", combine_cmd)
    subprocess.call(combine_cmd, shell=True, cwd=str(PROJ))
    subprocess.call(f"coverage json -o {coverage_json_path.name}{rc_opt}", shell=True, cwd=str(PROJ))
    subprocess.call(f"coverage xml  -o {coverage_xml_path.name}{rc_opt}",  shell=True, cwd=str(PROJ))
    subprocess.call(f"coverage html -d {HTML_DIR_GEN.name}{rc_opt}",       shell=True, cwd=str(PROJ))
    # 보관
    src_json = PROJ / coverage_json_path.name
    src_xml  = PROJ / coverage_xml_path.name
    if src_json.exists(): shutil.copy2(src_json, coverage_json_path)
    if src_xml.exists():  shutil.copy2(src_xml,  coverage_xml_path)
    print("✅ 커버리지 결합 완료")
    print(" - JSON :", coverage_json_path)
    print(" - XML  :", coverage_xml_path)
    print(" - HTML :", HTML_DIR_GEN / "index.html")

# ==== 4) 분기 관측/목표 달성률 ====
observed_outcomes_gen = {}; branch_points=full_hit=half_hit=zero_hit=0
if coverage_xml_path.exists():
    try:
        xml_root = etree.parse(str(coverage_xml_path)).getroot()
        for cls in xml_root.findall(".//class"):
            filename = cls.get("filename") or ""
            if not filename: continue
            abs_path = (PROJ / filename).resolve() if not Path(filename).is_absolute() else Path(filename)
            for line in cls.findall("./lines/line"):
                if line.get("branch") != "true": continue
                try: num = int(line.get("number"))
                except: continue
                cond = line.get("condition-coverage")
                covered = total = 0
                if cond:
                    m = re.search(r"\((\d+)\s*/\s*(\d+)\)", cond)
                    if m: covered, total = int(m.group(1)), int(m.group(2))
                if total == 0: continue
                observed_outcomes_gen.setdefault(str(abs_path), {})[num] = {
                    "covered": covered, "total": total, "ratio": round(covered/total, 3)
                }
                branch_points += 1
                if covered == 0: zero_hit += 1
                elif covered == total: full_hit += 1
                else: half_hit += 1
    except Exception as e:
        print("⚠️ coverage_xml 파싱 실패:", e)

(ART_DIR / "observed_outcomes_gen.json").write_text(
    json.dumps(observed_outcomes_gen, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"🧮 분기 관측 요약 → total:{branch_points}, full:{full_hit}, half:{half_hit}, zero:{zero_hit}")

# ==== 5) 목표 달성률 ====
GOALS_FILE = ART_DIR / "goals_ranked.json"
if GOALS_FILE.exists() and coverage_json_path.exists():
    cov_json = json.loads((coverage_json_path).read_text(encoding="utf-8"))
    files_map = cov_json.get("files", {})
    def line_hit(fpath: str, ln: int) -> bool:
        finfo = files_map.get(fpath) or files_map.get(str(Path(fpath).resolve()))
        if not finfo: return False
        executed = set(finfo.get("executed_lines", []) or [])
        return ln in executed
    goals = json.loads(GOALS_FILE.read_text(encoding="utf-8"))
    goal_stats = []
    for g in goals:
        f = g["file"]; abs1 = str((PROJ / f).resolve()); abs2 = f
        hit = sum(1 for ln in g.get("target_lines", []) if line_hit(abs1, ln) or line_hit(abs2, ln))
        total = len(g.get("target_lines", [])) or 1
        goal_stats.append({"id": g["id"], "hit": hit, "total": total, "rate": round(hit/total, 3)})
    (ART_DIR / "goal_achievements.json").write_text(
        json.dumps(goal_stats, ensure_ascii=False, indent=2), encoding="utf-8"
    )
    hit_goals = sum(1 for s in goal_stats if s["hit"] > 0)
    print(f"🎯 목표 달성률: {hit_goals}/{len(goal_stats)} 목표가 ≥1 라인 도달")

# ==== 6) 라인/브랜치 커버리지 요약 + 베이스 대비 델타 ====
def load_json(p: Path, default=None):
    try: return json.loads(p.read_text(encoding="utf-8"))
    except Exception: return default

base = load_json(ART_DIR / "coverage_base.json", {"files": {}}) or {"files": {}}
gen  = load_json(coverage_json_path, {"files": {}}) or {"files": {}}
base_files = base.get("files", {}); gen_files = gen.get("files", {})

def _sum_len(key, d):
    return sum(len((d.get(f, {}) or {}).get(key, []) or []) for f in d.keys())

base_exec = _sum_len("executed_lines", base_files)
base_miss = _sum_len("missing_lines",  base_files)
gen_exec  = _sum_len("executed_lines", gen_files)
gen_miss  = _sum_len("missing_lines",  gen_files)

def pct(a, b): return (100.0 * a / b) if b else 0.0
line_total_base = base_exec + base_miss
line_total_gen  = gen_exec + gen_miss

# 브랜치 합계는 XML에서 집계
def compute_branch_totals(xml_path: Path):
    if not xml_path.exists(): return (0,0)
    total = covered = 0
    try:
        root = etree.parse(str(xml_path)).getroot()
        for line in root.findall(".//line[@branch='true']"):
            cond = line.get("condition-coverage")
            if not cond: continue
            m = re.search(r"\((\d+)\s*/\s*(\d+)\)", cond)
            if not m: continue
            c, t = int(m.group(1)), int(m.group(2))
            covered += c; total += t
    except Exception:
        pass
    return covered, total

br_cov, br_tot = compute_branch_totals(coverage_xml_path)

delta = {
    "executed_lines_delta": gen_exec - base_exec,
    "missing_lines_delta":  base_miss - gen_miss,  # +면 미싱 감소
    "base_executed": base_exec, "gen_executed": gen_exec,
    "base_missing":  base_miss, "gen_missing":  gen_miss,
    "line_pct_base": round(pct(base_exec, line_total_base), 2),
    "line_pct_gen":  round(pct(gen_exec,  line_total_gen),  2),
    "branch_cov_gen": round(pct(br_cov, br_tot), 2) if br_tot else 0.0,
    "branch_hits_gen": br_cov, "branch_total_gen": br_tot
}
(ART_DIR / "coverage_delta.json").write_text(json.dumps(delta, ensure_ascii=False, indent=2), encoding="utf-8")
(ART_DIR / "coverage_summary.json").write_text(json.dumps(delta, ensure_ascii=False, indent=2), encoding="utf-8")
print("📈 베이스라인 대비 향상치/요약:", json.dumps(delta, ensure_ascii=False))

print("✅ 3-8 완료: (기존 + 활성 생성/보강) 격리 실행/샤드 결합/분기·목표·라인·브랜치 요약 산출")


✅ PASS   __BASELINE_SUITE__ (14.416s)
📒 ACTIVE_MANIFEST 사용: 9개 활성 테스트 실행
🧪 생성/보강 테스트 파일(실행 대상): 9개
✅ PASS   test_0001_test_gen_0001_banking_service_deposit_py_2.py (13.883s)
❌ FAIL   test_0002_test_gen_0002_run_worker_module_py_2.py (rc=1, 13.862s) :: 
✅ PASS   test_0004_test_gen_0004_activities_refund_py_2.py (12.992s)
❌ FAIL   test_0005_test_gen_0005_run_workflow_main_py_2.py (rc=1, 13.532s) :: 
✅ PASS   test_0007_test_gen_0007_activities_deposit_py_2.py (13.031s)
❌ FAIL   test_0008_test_gen_0008_activities_deposit_py_2.py (rc=5, 13.083s) :: 
❌ FAIL   test_0009_test_gen_0009_activities_refund_py_2.py (rc=2, 13.24s) :: 
❌ FAIL   test_gen_0006_banking_service_deposit_that_fails.py (rc=5, 12.633s) :: 
❌ FAIL   test_gen_0010_run_workflow_main.py (rc=1, 13.672s) :: 

📦 실행 요약: {"pass": 4, "fail": 6, "timeout": 0, "total": 10}
>  coverage combine --rcfile /content/money-transfer-project-template-python/.coveragerc /content/money-transfer-project-template-python/run_artifacts/run1/cov_shards

In [26]:
#@title 3-9) 최종 비교 – PyTest(베이스라인) vs 본 연구(3-8 최종 산출 직접 사용) + 모듈별 리포트
import os, sys, json, re, subprocess, shlex, shutil
from pathlib import Path
from datetime import datetime, timezone
from lxml import etree

# ===== 경로/상수 =====
assert 'PROJ' in globals(), "3-0 단계를 먼저 실행하세요."
PROJ = Path(PROJ).resolve()
ART_DIR = PROJ / "run_artifacts" / "run1"
GEN_DIR = PROJ / "generated_tests"
TESTS_DIR = PROJ / "tests"
LOG_DIR = ART_DIR / "logs"

HTML_DIR_BASE = PROJ / "htmlcov_pytest"         # pytest 단독 HTML
HTML_DIR_OUR  = PROJ / "htmlcov_our"            # 본 연구 HTML (리플레이 모드 시)
HTML_DIR_38   = PROJ / "htmlcov_gen"            # 3-8 HTML 디렉터리(직접사용 모드)

RCFILE = PROJ / ".coveragerc"
rc_opt = f" --rcfile {RCFILE}" if RCFILE.exists() else ""

# ===== 모드 선택 =====
# - "from_38_artifacts": 3-8 최종 산출(coverage_gen.json/xml, htmlcov_gen)을 그대로 사용 → 3-8과 동일 결과 보장
# - "replay_stable_k"  : 최근 5턴 안정 통과 집합을 baseline 위에 append 실행(이전 3-9 동작)
OUR_SOURCE_MODE = "from_38_artifacts"   # 기본값 권장
STABLE_K = 5

# ===== 입력/산출 경로 =====
GOALS_FILE   = ART_DIR / "goals_ranked.json"        # 3-2
RESULTS_JL   = ART_DIR / "results.jsonl"            # 3-8/3-5
HISTORY_JL   = ART_DIR / "stable_history.jsonl"     # 3-8 누적 기록(있으면)

# 3-8 최종 산출물
COV_JSON_38 = ART_DIR / "coverage_gen.json"
COV_XML_38  = ART_DIR / "coverage_gen.xml"

# 본 단계 산출물
COV_JSON_BASE = ART_DIR / "coverage_pytest.json"
COV_XML_BASE  = ART_DIR / "coverage_pytest.xml"
COV_JSON_OUR  = ART_DIR / "coverage_our.json"
COV_XML_OUR   = ART_DIR / "coverage_our.xml"

COMPARE_JSON        = ART_DIR / "comparison_report.json"
FINAL_MANIFEST      = ART_DIR / "final_comparison_manifest.json"
MODULE_BREAKDOWN_JS = ART_DIR / "module_breakdown.json"
MODULE_BREAKDOWN_CSV= ART_DIR / "module_breakdown.csv"

# ===== 실행 파라미터(리플레이 모드에서 사용) =====
PY_EXE = sys.executable
TIMEOUT_BASE = 180
TIMEOUT_EACH = 40
PYTEST_FLAGS = "-q -s"
ENV_BASE = os.environ.copy()
ENV_BASE["PYTHONPATH"] = f"{PROJ}:{ENV_BASE.get('PYTHONPATH','')}"
ENV_BASE.setdefault("NO_PROXY", "*")
ENV_BASE.setdefault("PYTHONHASHSEED", "0")

# ===== 유틸 =====
def load_json(p: Path, default=None):
    try: return json.loads(p.read_text(encoding="utf-8"))
    except Exception: return default

def rel_to_proj(p: Path) -> str:
    try: return str(p.resolve().relative_to(PROJ))
    except Exception: return str(p.resolve())

def sh_run(cmd: str, timeout: int | None = None, cwd: Path | None = None, env: dict | None = None):
    try:
        p = subprocess.run(cmd, cwd=str(cwd or PROJ), env=env or ENV_BASE,
                           shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                           timeout=timeout, text=True)
        return p.returncode, p.stdout, p.stderr, False
    except subprocess.TimeoutExpired as e:
        return 124, e.stdout or "", e.stderr or "", True

def parse_branch_coverage(xml_path: Path) -> tuple[int,int]:
    if not xml_path.exists(): return (0,0)
    try:
        root = etree.parse(str(xml_path)).getroot()
        cv = tt = 0
        for line in root.findall(".//line[@branch='true']"):
            m = re.search(r"\((\d+)\s*/\s*(\d+)\)", line.get("condition-coverage",""))
            if m:
                c, t = int(m.group(1)), int(m.group(2))
                cv += c; tt += t
        return (cv, tt)
    except Exception:
        return (0,0)

def per_file_branch_map(xml_path: Path) -> dict[str, dict]:
    stats = {}
    if not xml_path.exists(): return stats
    try:
        root = etree.parse(str(xml_path)).getroot()
        for cls in root.findall(".//class"):
            fn = cls.get("filename") or ""
            if not fn: continue
            abs_path = (PROJ / fn).resolve() if not Path(fn).is_absolute() else Path(fn).resolve()
            cv = tt = 0
            for line in cls.findall("./lines/line[@branch='true']"):
                m = re.search(r"\((\d+)\s*/\s*(\d+)\)", line.get("condition-coverage",""))
                if m:
                    c, t = int(m.group(1)), int(m.group(2))
                    cv += c; tt += t
            if tt>0:
                stats[str(abs_path)] = {"covered": cv, "total": tt, "pct": round(cv/tt*100.0,2)}
    except Exception:
        pass
    return stats

def line_stats_from_json(cov_json_path: Path) -> tuple[int,int,int,float]:
    d = load_json(cov_json_path, {"files": {}}) or {"files": {}}
    files = d.get("files", {}) or {}
    ex = ms = 0
    for info in files.values():
        ex += len((info or {}).get("executed_lines", []) or [])
        ms += len((info or {}).get("missing_lines",  []) or [])
    tot = ex + ms
    pct = round(ex/tot*100.0, 2) if tot else 0.0
    return ex, ms, tot, pct

def per_file_line_map(cov_json_path: Path) -> dict[str, dict]:
    out = {}
    d = load_json(cov_json_path, {"files": {}}) or {"files": {}}
    for f, info in (d.get("files", {}) or {}).items():
        ex = len((info or {}).get("executed_lines", []) or [])
        ms = len((info or {}).get("missing_lines",  []) or [])
        tot = ex + ms
        pct = round(ex/tot*100.0, 2) if tot else 0.0
        abs_path = (Path(f).resolve() if Path(f).is_absolute() else (PROJ / f).resolve())
        out[str(abs_path)] = {"executed": ex, "missing": ms, "total": tot, "pct": pct}
    return out

def file_to_module(abs_path_str: str) -> str:
    p = Path(abs_path_str).resolve()
    try: rel = p.relative_to(PROJ)
    except Exception: return p.name
    return rel.stem if len(rel.parts)==1 else rel.parts[0]

def aggregate_by_module(line_map: dict[str, dict], branch_map: dict[str, dict]) -> dict[str, dict]:
    mod = {}
    for f, s in line_map.items():
        m = file_to_module(f)
        slot = mod.setdefault(m, {"line_exec":0, "line_total":0, "branch_cov":0, "branch_tot":0})
        slot["line_exec"] += s.get("executed",0)
        slot["line_total"]+= s.get("total",0)
    for f, s in branch_map.items():
        m = file_to_module(f)
        slot = mod.setdefault(m, {"line_exec":0, "line_total":0, "branch_cov":0, "branch_tot":0})
        slot["branch_cov"] += s.get("covered",0)
        slot["branch_tot"] += s.get("total",0)
    out = {}
    for m, s in mod.items():
        lp = round(s["line_exec"]/s["line_total"]*100.0,2) if s["line_total"]>0 else 0.0
        bp = round(s["branch_cov"]/s["branch_tot"]*100.0,2) if s["branch_tot"]>0 else 0.0
        out[m] = {"line_executed":s["line_exec"],"line_total":s["line_total"],"line_pct":lp,
                  "branch_covered":s["branch_cov"],"branch_total":s["branch_tot"],"branch_pct":bp}
    return out

def goal_hits_from_json(goals, cov_json_path: Path) -> tuple[int,int]:
    d = load_json(cov_json_path, {"files": {}}) or {"files": {}}
    fmap = d.get("files", {}) or {}
    def hit(fpath: str, ln: int) -> bool:
        finfo = fmap.get(fpath) or fmap.get(str(Path(fpath).resolve()))
        return bool(finfo) and ln in set(finfo.get("executed_lines", []) or [])
    reached = 0
    for g in goals or []:
        f = g["file"]
        a1 = str((PROJ / f).resolve()); a2 = f
        tlines = g.get("target_lines", []) or []
        if any(hit(a1,ln) or hit(a2,ln) for ln in tlines):
            reached += 1
    return reached, len(goals or [])

# ===== 1) PyTest(베이스라인) 실행/커버리지 =====
for nm in [".coverage", COV_JSON_BASE.name, COV_XML_BASE.name]:
    p = PROJ / nm
    if p.exists(): p.unlink()
if TESTS_DIR.exists() and any(TESTS_DIR.glob("test*.py")):
    rc, out, err, to = sh_run(f"{PY_EXE} -m coverage run{rc_opt} -m pytest -q -s {shlex.quote(rel_to_proj(TESTS_DIR))}",
                              timeout=TIMEOUT_BASE)
    (LOG_DIR / "__cmp_pytest.out.txt").write_text(out or "", encoding="utf-8")
    (LOG_DIR / "__cmp_pytest.err.txt").write_text(err or "", encoding="utf-8")
    print("✅ PyTest baseline 실행 PASS" if rc==0 else f"⚠️ PyTest rc={rc} (그래도 커버리지 산출)")
else:
    print("ℹ️ tests/ 비어있음 → baseline 실행 건너뜀.")
subprocess.call(f"coverage json -o {COV_JSON_BASE.name}{rc_opt}", shell=True, cwd=str(PROJ))
subprocess.call(f"coverage xml  -o {COV_XML_BASE.name}{rc_opt}",  shell=True, cwd=str(PROJ))
subprocess.call(f"coverage html -d {HTML_DIR_BASE.name}{rc_opt}",  shell=True, cwd=str(PROJ))
shutil.copy2(PROJ / COV_JSON_BASE.name, COV_JSON_BASE) if (PROJ / COV_JSON_BASE.name).exists() else None
shutil.copy2(PROJ / COV_XML_BASE.name,  COV_XML_BASE)  if (PROJ / COV_XML_BASE.name).exists()  else None

# ===== 2) 우리 방법 커버리지 소스 결정 =====
if OUR_SOURCE_MODE == "from_38_artifacts":
    # 3-8의 coverage_gen.*/htmlcov_gen 그대로 사용 → 3-8 결과와 완전 일치
    if not COV_JSON_38.exists() or not COV_XML_38.exists():
        raise SystemExit("coverage_gen.json/xml(3-8 산출)이 없습니다. 3-8을 먼저 완료하세요.")
    shutil.copy2(COV_JSON_38, COV_JSON_OUR)
    shutil.copy2(COV_XML_38,  COV_XML_OUR)
    our_html_index = (HTML_DIR_38 / "index.html").resolve()
    print("📦 본 연구 커버리지: 3-8 최종 산출 그대로 사용")

else:
    # (옵션) 최근 5턴 안정집합 리플레이 모드
    # 필요 시 이전 3-9 코드의 pick_stable_tests_last_k 등을 그대로 붙여 넣어 사용할 수 있음
    raise SystemExit("OUR_SOURCE_MODE='replay_stable_k' 지원 분기는 생략했습니다. 필요 시 알려주세요.")

# ===== 3) 지표 계산(라인/분기/목표) =====
goals = load_json(GOALS_FILE, []) or []
base_exec, base_miss, base_total, base_line_pct = line_stats_from_json(COV_JSON_BASE)
our_exec,  our_miss,  our_total,  our_line_pct  = line_stats_from_json(COV_JSON_OUR)

base_bcov, base_btot = parse_branch_coverage(COV_XML_BASE)
our_bcov,  our_btot  = parse_branch_coverage(COV_XML_OUR)
base_branch_pct = round(base_bcov/base_btot*100.0, 2) if base_btot else 0.0
our_branch_pct  = round(our_bcov/our_btot*100.0,  2) if our_btot  else 0.0

reached_base, total_goals = goal_hits_from_json(goals, COV_JSON_BASE)
reached_our,  _           = goal_hits_from_json(goals, COV_JSON_OUR)

# ===== 4) 모듈별 리포트 =====
base_line_map   = per_file_line_map(COV_JSON_BASE)
our_line_map    = per_file_line_map(COV_JSON_OUR)
base_branch_map = per_file_branch_map(COV_XML_BASE)
our_branch_map  = per_file_branch_map(COV_XML_OUR)

def aggregate_by_module(line_map, branch_map):
    mod = {}
    for f, s in line_map.items():
        m = file_to_module(f)
        slot = mod.setdefault(m, {"line_exec":0,"line_total":0,"branch_cov":0,"branch_tot":0})
        slot["line_exec"] += s["executed"]; slot["line_total"] += s["total"]
    for f, s in branch_map.items():
        m = file_to_module(f)
        slot = mod.setdefault(m, {"line_exec":0,"line_total":0,"branch_cov":0,"branch_tot":0})
        slot["branch_cov"] += s["covered"]; slot["branch_tot"] += s["total"]
    out = {}
    for m, s in mod.items():
        lp = round(s["line_exec"]/s["line_total"]*100.0,2) if s["line_total"]>0 else 0.0
        bp = round(s["branch_cov"]/s["branch_tot"]*100.0,2) if s["branch_tot"]>0 else 0.0
        out[m] = {"line_executed":s["line_exec"],"line_total":s["line_total"],"line_pct":lp,
                  "branch_covered":s["branch_cov"],"branch_total":s["branch_tot"],"branch_pct":bp}
    return out

base_by_module = aggregate_by_module(base_line_map, base_branch_map)
our_by_module  = aggregate_by_module(our_line_map,  our_branch_map)

modules = sorted(set(base_by_module.keys()) | set(our_by_module.keys()))
rows = []
for m in modules:
    b = base_by_module.get(m, {"line_total":0,"line_pct":0.0,"branch_total":0,"branch_pct":0.0})
    o = our_by_module.get(m,  {"line_total":0,"line_pct":0.0,"branch_total":0,"branch_pct":0.0})
    rows.append({
        "module": m,
        "pytest_line_pct": b["line_pct"], "our_line_pct": o["line_pct"],
        "delta_line_pct": round(o["line_pct"] - b["line_pct"], 2),
        "pytest_branch_pct": b["branch_pct"], "our_branch_pct": o["branch_pct"],
        "delta_branch_pct": round(o["branch_pct"] - b["branch_pct"], 2),
        "pytest_line_measured": b.get("line_total",0), "our_line_measured": o.get("line_total",0),
        "pytest_branch_total": b.get("branch_total",0), "our_branch_total": o.get("branch_total",0),
    })

MODULE_BREAKDOWN_JS.write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding="utf-8")
with MODULE_BREAKDOWN_CSV.open("w", encoding="utf-8") as f:
    f.write("module,pytest_line_pct,our_line_pct,delta_line_pct,pytest_branch_pct,our_branch_pct,delta_branch_pct,pytest_line_measured,our_line_measured,pytest_branch_total,our_branch_total\n")
    for r in rows:
        f.write("{module},{pytest_line_pct},{our_line_pct},{delta_line_pct},{pytest_branch_pct},{our_branch_pct},{delta_branch_pct},{pytest_line_measured},{our_line_measured},{pytest_branch_total},{our_branch_total}\n".format(**r))

# ===== 5) 비교/델타 요약 =====
comparison = {
    "pytest_baseline": {
        "lines": {"executed": base_exec, "missing": base_miss, "measured": base_total, "line_coverage_pct": base_line_pct},
        "branches": {"covered": base_bcov, "total": base_btot, "branch_coverage_pct": base_branch_pct},
        "goals": {"reached_any": reached_base, "total": total_goals,
                  "goal_success_pct": round((reached_base/total_goals*100.0) if total_goals>0 else 0.0, 2)},
        "html_index": str((HTML_DIR_BASE / "index.html").resolve())
    },
    "our_method": {
        "lines": {"executed": our_exec, "missing": our_miss, "measured": our_total, "line_coverage_pct": our_line_pct},
        "branches": {"covered": our_bcov, "total": our_btot, "branch_coverage_pct": our_branch_pct},
        "goals": {"reached_any": reached_our, "total": total_goals,
                  "goal_success_pct": round((reached_our/total_goals*100.0) if total_goals>0 else 0.0, 2)},
        "html_index": str((HTML_DIR_38 / "index.html").resolve()) if OUR_SOURCE_MODE=="from_38_artifacts"
                      else str((HTML_DIR_OUR / "index.html").resolve())
    },
    "deltas(our_minus_pytest)": {
        "lines": {
            "executed_delta": our_exec - base_exec,
            "missing_delta":  base_miss - our_miss,
            "line_coverage_pct_delta": round(our_line_pct - base_line_pct, 2)
        },
        "branches": {
            "covered_delta": our_bcov - base_bcov,
            "total_delta":   our_btot - base_btot,
            "branch_coverage_pct_delta": round(our_branch_pct - base_branch_pct, 2)
        },
        "goals": {
            "reached_delta": reached_our - reached_base,
            "goal_success_pct_delta": round(((reached_our - reached_base) / total_goals * 100.0), 2) if total_goals>0 else 0.0
        }
    },
    "module_breakdown_paths": {"json": str(MODULE_BREAKDOWN_JS), "csv": str(MODULE_BREAKDOWN_CSV)},
    "mode": OUR_SOURCE_MODE
}
(Path(ART_DIR) / "comparison_report.json").write_text(json.dumps(comparison, ensure_ascii=False, indent=2), encoding="utf-8")

print("\n📊 최종 비교 요약 (PyTest vs 본 연구; mode =", OUR_SOURCE_MODE, ")")
print(f"- 라인 커버리지:   PyTest {comparison['pytest_baseline']['lines']['line_coverage_pct']}%  |  본 연구 {comparison['our_method']['lines']['line_coverage_pct']}%  |  Δ {comparison['deltas(our_minus_pytest)']['lines']['line_coverage_pct_delta']}%p")
print(f"- 분기 커버리지:   PyTest {comparison['pytest_baseline']['branches']['branch_coverage_pct']}% |  본 연구 {comparison['our_method']['branches']['branch_coverage_pct']}% |  Δ {comparison['deltas(our_minus_pytest)']['branches']['branch_coverage_pct_delta']}%p")
print(f"- 목표 달성률:     PyTest {comparison['pytest_baseline']['goals']['goal_success_pct']}% |  본 연구 {comparison['our_method']['goals']['goal_success_pct']}% |  Δ {comparison['deltas(our_minus_pytest)']['goals']['goal_success_pct_delta']}%p")
print(f"- 모듈별 리포트: {MODULE_BREAKDOWN_JS.name}, {MODULE_BREAKDOWN_CSV.name}")

FINAL_MANIFEST = ART_DIR / "final_comparison_manifest.json"
FINAL_MANIFEST.write_text(json.dumps({
    "generated_at_utc": datetime.now(timezone.utc).isoformat(),
    "project": str(PROJ),
    "run_dir": str(ART_DIR),
    "artifacts": {
        "pytest": {
            "coverage_json": str(COV_JSON_BASE),
            "coverage_xml": str(COV_XML_BASE),
            "coverage_html_index": str((HTML_DIR_BASE / "index.html").resolve()),
            "log_out": str((LOG_DIR / "__cmp_pytest.out.txt").resolve()),
            "log_err": str((LOG_DIR / "__cmp_pytest.err.txt").resolve()),
        },
        "our_method": {
            "coverage_json": str(COV_JSON_OUR),
            "coverage_xml": str(COV_XML_OUR),
            "coverage_html_index": str((HTML_DIR_38 / "index.html").resolve()) if OUR_SOURCE_MODE=="from_38_artifacts"
                                   else str((HTML_DIR_OUR / "index.html").resolve()),
        },
        "comparison_json": str(COMPARE_JSON),
        "module_breakdown_json": str(MODULE_BREAKDOWN_JS),
        "module_breakdown_csv":  str(MODULE_BREAKDOWN_CSV),
        "mode": OUR_SOURCE_MODE
    }
}, ensure_ascii=False, indent=2), encoding="utf-8")
print("✅ 최종 비교 산출 완료")


✅ PyTest baseline 실행 PASS
📦 본 연구 커버리지: 3-8 최종 산출 그대로 사용

📊 최종 비교 요약 (PyTest vs 본 연구; mode = from_38_artifacts )
- 라인 커버리지:   PyTest 65.38%  |  본 연구 75.0%  |  Δ 9.62%p
- 분기 커버리지:   PyTest 60.0% |  본 연구 80.0% |  Δ 20.0%p
- 목표 달성률:     PyTest 0.0% |  본 연구 20.0% |  Δ 20.0%p
- 모듈별 리포트: module_breakdown.json, module_breakdown.csv
✅ 최종 비교 산출 완료
