In [None]:
# Cell 1: imports & config

import os
import json
import copy
import math
import queue

import torch
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity as cos

# -----------------------
# 데이터 경로 설정 (여기만 네 상황에 맞게 수정)
# -----------------------
DATA_DIR = "Amazon_products"      # 예: "/content/data"
DATASET = "Amazon_products/train/train_corpus.txt"          # document
GPU = 0                         # cuda 디바이스 번호





DATA_DIR: /content/drive/MyDrive/BDA/Amazon_products
DATASET: /content/drive/MyDrive/BDA/Amazon_products/train/train_corpus.txt
GPU: 0
OUT_PATH: /content/drive/MyDrive/BDA/Amazon_products/train/init_core_classes_s_test.json


In [None]:
# Cell: Node 클래스 + 그래프 생성 함수

class Node:
    def __init__(self, node_id, name):
        self.node_id = str(node_id)       # "0", "1", ...
        self.name = name                  # "grocery_gourmet_food"
        self.childs = []                  # List[Node]
        self.parents = []                 # List[Node]
        self.path_score = 0.0
        self.similarity_score = 0.0

    def __repr__(self):
        return f"Node(id={self.node_id}, name={self.name})"


def build_graph_from_files(label_file, edge_file):
    """
    label_file: "id<tab>label_name"
    edge_file : "parent_id<tab>child_id"
    """
    id2label = {}
    label2id = {}
    id2node = {}

    # 1) 라벨 파일 읽기
    with open(label_file, encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split("\t")
            if len(parts) != 2:
                continue
            idx, name = parts
            idx = str(idx)
            name = name.strip()
            id2label[idx] = name
            label2id[name] = idx
            id2node[idx] = Node(idx, name)

    # 2) 부모-자식 엣지 파일 읽기
    with open(edge_file, encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split("\t")
            if len(parts) != 2:
                continue
            p, c = parts
            p = str(p); c = str(c)
            if p not in id2node or c not in id2node:
                continue
            parent = id2node[p]
            child = id2node[c]
            parent.childs.append(child)
            child.parents.append(parent)

    # 3) 부모가 없는 노드들 = top-level 루트들
    roots = [n for n in id2node.values() if len(n.parents) == 0]

    if len(roots) == 1:
        root = roots[0]
    else:
        # top-level이 여러 개면 슈퍼루트 하나 만들어서 모두 연결
        root = Node("-1", "ROOT")
        for r in roots:
            root.childs.append(r)
            r.parents.append(root)

    print(f"#labels: {len(id2label)}, #roots(before super-root): {len(roots)}")
    return root, id2label, label2id, id2node


In [None]:
import mmap
def get_num_lines(file_path):
    fp = open(file_path, "r+")
    buf = mmap.mmap(fp.fileno(), 0)
    lines = 0
    while buf.readline():
        lines += 1
    return lines

In [None]:
# Cell 2: graph, corpus, embedding 준비

# 1) 라벨 키워드 로드 (llm_enrichment.txt)
enriched_file = os.path.join(DATA_DIR, "class_related_keywords.txt")
label_keyterm_dict = {}

with open(enriched_file, encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        components = line.split(":")
        node = components[0]          # label_name (with underscore)
        keywords = components[1]
        keyword_list = [k for k in keywords.split(",") if k]
        label_keyterm_dict[node] = keyword_list

print("num labels with keywords:", len(label_keyterm_dict))


# 3) taxonomy 그래프 로드
LABEL_FILE = os.path.join(DATA_DIR, "classes.txt")      # 예시 이름
EDGE_FILE  = os.path.join(DATA_DIR, "class_hierarchy.txt") # 예시 이름

root, id2label, label2id, id2node = build_graph_from_files(LABEL_FILE, EDGE_FILE)

num_class = len(id2label)
print("num_class:", num_class)
# 4) corpus.txt 로드: "doc_id \t text"
corpus_path = os.path.join(DATASET)
num_line = get_num_lines(corpus_path)

all_docs = []
all_docs_id = []

with open(corpus_path, encoding="utf-8") as f:
    for i, line in tqdm(enumerate(f), total=num_line):
        line = line.rstrip("\n")
        if not line:
            continue
        doc_id, doc = line.split("\t", 1)
        all_docs.append(doc)
        all_docs_id.append(doc_id)

print("num_docs:", len(all_docs))





num labels with keywords: 531


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/54.0 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

configuration.py: 0.00B [00:00, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/Alibaba-NLP/new-impl:
- configuration.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


modeling.py: 0.00B [00:00, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/Alibaba-NLP/new-impl:
- modeling.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


model.safetensors:   0%|          | 0.00/547M [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/695 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/297 [00:00<?, ?B/s]

Loaded SentenceTransformer: Alibaba-NLP/gte-base-en-v1.5
#labels: 531, #roots(before super-root): 6
num_class: 531


100%|██████████| 29487/29487 [00:00<00:00, 663324.10it/s]


num_docs: 29487


Batches:   0%|          | 0/231 [00:00<?, ?it/s]

100%|██████████| 531/531 [00:06<00:00, 83.22it/s]

built key_term_emb_dict for all labels





In [None]:
!pip install -U openai pydantic


Collecting openai
  Downloading openai-2.13.0-py3-none-any.whl.metadata (29 kB)
Collecting pydantic
  Downloading pydantic-2.12.5-py3-none-any.whl.metadata (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.6/90.6 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
Collecting pydantic-core==2.41.5 (from pydantic)
  Downloading pydantic_core-2.41.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.3 kB)
Downloading openai-2.13.0-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m35.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydantic-2.12.5-py3-none-any.whl (463 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m463.6/463.6 kB[0m [31m43.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydantic_core-2.41.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m90.1 MB/s[0m et

In [None]:
import os
from openai import OpenAI

os.environ["OPENAI_API_KEY"]="your key"


In [None]:
import openai, sys, os
print("python:", sys.version)
print("openai:", openai.__version__)
print("OPENAI_API_KEY set:", bool(os.environ.get("OPENAI_API_KEY")))


python: 3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]
openai: 2.13.0
OPENAI_API_KEY set: True


In [None]:
# Cell 4: adjacency matrix + parents/siblings
import json
import numpy as np
adj_upper = np.zeros((num_class, num_class), dtype=np.int32)

with open(EDGE_FILE, encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        p_str, c_str = line.split("\t")
        p = int(p_str)
        c = int(c_str)
        adj_upper[p, c] = 1


print("adj_upper shape:", adj_upper.shape)

# ----- 위에서 이미 정의했던 함수들을 그대로 사용 -----
import numpy as np

def build_parents_children(adj: np.ndarray):
    C = adj.shape[0]
    parents = [np.flatnonzero(adj[:, j]).astype(np.int64) for j in range(C)]
    children = [np.flatnonzero(adj[j]).astype(np.int64) for j in range(C)]
    return parents, children

def build_siblings(parents, children):
    C = len(parents)
    sibs = [set() for _ in range(C)]
    for c in range(C):
        for p in parents[c]:
            for ch in children[p]:
                if ch != c:
                    sibs[c].add(int(ch))
    sibs = [np.array(sorted(s), dtype=np.int64) for s in sibs]
    return sibs

parents, children = build_parents_children(adj_upper)
siblings = build_siblings(parents, children)
roots = [i for i, ps in enumerate(parents) if len(ps) == 0]

print("built parents & siblings")
print(roots)


adj_upper shape: (531, 531)
built parents & siblings
[0, 3, 10, 23, 40, 169]


In [None]:
import os, json, csv, time, random
from datetime import datetime
from typing import List, Optional, Dict, Any

from openai import OpenAI

MODEL = "gpt-4o-mini"

# 디버깅 단계에선 1로 추천(문제 원인 좁히기)
BATCH_SIZE = 1

# 규격 체크(원하면 꺼도 됨)
CHECK_WORDS = True
MIN_WORDS, MAX_WORDS = 80, 200
CHECK_4_SENTENCES = True

# 이번 실행에서 최대 호출 수(노트북이라 안전하게 작게)
MAX_CALLS_THIS_RUN = 20

# 저장 경로(실행마다 폴더 분리)
RUN_ID = datetime.utcnow().strftime("%Y%m%dT%H%M%S")
RUN_DIR = f"Amazon_products/run_{RUN_ID}"
os.makedirs(RUN_DIR, exist_ok=True)

OUT_CSV = os.path.join(RUN_DIR, "label_docs.csv")            # label, doc, valid
RAW_JSONL = os.path.join(RUN_DIR, "raw_generations.jsonl")   # GPT가 만든 원문/파싱 결과 전부
ERR_JSONL = os.path.join(RUN_DIR, "errors.jsonl")            # 예외/실패 기록
PROG_JSON = os.path.join(RUN_DIR, "progress.json")           # 요약 로그

client = OpenAI()

progress = {
    "attempted_calls": 0,
    "api_ok_calls": 0,        # API 응답 자체 성공
    "parsed_ok_calls": 0,     # docs 파싱까지 성공
    "valid_docs": 0,          # 규격 통과 doc 개수
    "invalid_docs": 0,        # 규격 실패 doc 개수
    "exceptions": 0,
}
with open(PROG_JSON, "w", encoding="utf-8") as f:
    json.dump(progress, f, ensure_ascii=False, indent=2)

print("RUN_DIR:", RUN_DIR)


RUN_DIR: /content/drive/MyDrive/BDA/Amazon_products/run_20251218T130946


  RUN_ID = datetime.utcnow().strftime("%Y%m%dT%H%M%S")


In [None]:
# 아래 변수들이 이미 로드되어 있어야 함:
# num_class, id2label, parents, children, label_keyterm_dict
missing = []
for name in ["num_class", "id2label", "parents", "children", "label_keyterm_dict"]:
    if name not in globals():
        missing.append(name)

if missing:
    raise RuntimeError(f"Missing globals: {missing}. 먼저 taxonomy 데이터 로드 셀을 실행해줘.")
else:
    print("OK. num_class =", num_class)


OK. num_class = 531


In [None]:
import re

MAX_CHILDREN = 6
MAX_SIBLINGS = 6

def get_parent(cid: int) -> Optional[int]:
    ps = parents[cid]
    return int(ps[0]) if (ps is not None and len(ps) > 0) else None

def get_children(cid: int) -> List[int]:
    ch = children[cid]
    return [int(x) for x in ch] if ch is not None else []

def get_siblings(cid: int) -> List[int]:
    p = get_parent(cid)
    if p is None:
        return []
    return [x for x in get_children(p) if x != cid]

def build_ctx(cid: int) -> dict:
    lab = id2label[str(cid)]
    p = get_parent(cid)
    ch = get_children(cid)[:MAX_CHILDREN]
    sib = get_siblings(cid)[:MAX_SIBLINGS]
    kws = label_keyterm_dict.get(lab, [])[:10]
    return {
        "cid": cid,
        "label": lab,
        "label_text": lab.replace("_", " "),
        "parent": id2label[str(p)] if p is not None else None,
        "children": [id2label[str(x)] for x in ch],
        "siblings": [id2label[str(x)] for x in sib],
        "given_keywords": kws,
    }

def normalize_doc(doc: str) -> str:
    # single paragraph 만들기
    return " ".join(str(doc).split())

def word_count(doc: str) -> int:
    return len(doc.split())

def sentence_count_rough(doc: str) -> int:
    # 아주 러프하게 ., ?, ! 기준
    s = re.split(r'(?<=[.!?])\s+', doc.strip())
    s = [x for x in s if x.strip()]
    return len(s)

def is_valid_doc(doc: str) -> (bool, dict):
    doc2 = normalize_doc(doc)
    wc = word_count(doc2)
    sc = sentence_count_rough(doc2)
    ok = True
    reasons = []
    if CHECK_WORDS and not (MIN_WORDS <= wc <= MAX_WORDS):
        ok = False
        reasons.append(f"word_count={wc}")
    if CHECK_4_SENTENCES and sc != 5:
        ok = False
        reasons.append(f"sentence_count={sc}")
    return ok, {"word_count": wc, "sentence_count": sc, "reasons": reasons}

def build_request_payload(ctx_list: List[dict]) -> dict:
    system = (
        "You write precise Amazon marketplace product-category descriptions for taxonomy labels.\n"
        "ENGLISH ONLY.\n"
        "Return JSON that matches the provided schema.\n"
        "You will receive an array 'labels' with contexts. Produce docs in the SAME ORDER.\n"
        "\n"
        "Core rule:\n"
        "- Write a standalone description of the label itself only.\n"
        "- Do NOT reference or mention parent, children, siblings, hierarchy, or any provided keywords.\n"
        "- Even if those fields are present in the input, ignore them completely.\n"
        "\n"
        "Content guidance:\n"
        "- Describe what products sold on Amazon belong in this category, including typical item types, common variants, and typical use cases.\n"
        "- Include practical details that matter in customer reviews, such as fit, sizing, compatibility, durability, performance, materials, comfort, ease of use, and value.\n"
        "- Define clear boundaries by stating what is included and what is excluded, without naming related categories or using comparisons to parent or sibling labels.\n"
        "\n"
        "Hard formatting constraints for each doc:\n"
        "- Exactly 5 sentences.\n"
        "- Single paragraph with no newline characters.\n"
        "- Plain text only: no markdown, no bullets, no numbering.\n"
        "- 120 to 170 words total.\n"
        "- Use exactly five period characters '.' total: one at the end of each sentence.\n"
        "- Do NOT use any other periods anywhere (no abbreviations, no initials, no decimals).\n"
        "- Do NOT use semicolons or colons.\n"
    )

    user_obj = {"labels": ctx_list}

    # JSON Schema 방식(Responses API)
    schema = {
        "type": "object",
        "properties": {
            "docs": {
                "type": "array",
                "items": {"type": "string"},
                "minItems": len(ctx_list),
                "maxItems": len(ctx_list),
            }
        },
        "required": ["docs"],
        "additionalProperties": False,
    }

    return {
        "model": MODEL,
        "input": [
            {"role": "system", "content": system},
            {"role": "user", "content": json.dumps(user_obj, ensure_ascii=False)},
        ],
        "max_output_tokens": 7000,
        "text": {
            "format": {
                "type": "json_schema",
                "name": "label_docs_v1",
                "strict": True,
                "schema": schema,
            }
        },
        "store": False,
    }

def response_to_dict(resp) -> Dict[str, Any]:
    if hasattr(resp, "model_dump"):
        return resp.model_dump()
    if hasattr(resp, "to_dict"):
        return resp.to_dict()
    try:
        return dict(resp)
    except Exception:
        return {"_raw": str(resp)}

def extract_raw_text(resp) -> str:
    # SDK 편의 필드(output_text) 우선 (Responses 가이드에 언급) :contentReference[oaicite:0]{index=0}
    ot = getattr(resp, "output_text", None)
    if isinstance(ot, str) and ot.strip():
        return ot.strip()

    body = response_to_dict(resp)
    texts = []
    for out in body.get("output", []) or []:
        for c in out.get("content", []) or []:
            t = c.get("text")
            if isinstance(t, str) and t.strip():
                texts.append(t)
    return "\n".join(texts).strip()

def parse_docs_from_response(resp) -> List[str]:
    raw = extract_raw_text(resp)
    # 혹시 뒤에 공백/개행이 붙어도 JSON이면 파싱 가능해야 함
    raw2 = raw.replace("```json", "").replace("```", "").strip()
    parsed = json.loads(raw2)
    docs = parsed.get("docs", None)
    if not isinstance(docs, list):
        raise RuntimeError("parsed JSON has no list field 'docs'")
    return docs, raw2

def write_jsonl(path: str, obj: dict):
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

def save_progress():
    with open(PROG_JSON, "w", encoding="utf-8") as f:
        json.dump(progress, f, ensure_ascii=False, indent=2)

def call_with_retry(payload: dict, max_retries: int = 5):
    for attempt in range(max_retries + 1):
        try:
            return client.responses.create(**payload)  # Responses API :contentReference[oaicite:1]{index=1}
        except Exception as e:
            if attempt >= max_retries:
                raise
            # 지수 백오프
            sleep = min(30.0, (2 ** attempt) * 1.0 + random.random() * 0.5)
            time.sleep(sleep)

from pydantic import create_model, conlist
import openai, pydantic
print("openai:", openai.__version__)
print("pydantic:", pydantic.__version__)

from pydantic import BaseModel
try:
    # pydantic v2
    from pydantic import ConfigDict, conlist
    PYDANTIC_V2 = True
except Exception:
    # pydantic v1
    from pydantic import conlist
    PYDANTIC_V2 = False

from datetime import datetime, timezone

def ts_utc():
    return datetime.now(timezone.utc).isoformat()


def make_docs_text_format(n_docs: int):
    # 길이 고정(conlist) 제거: parse 자체는 되게 만들고,
    # 길이/규격은 우리가 후처리로 체크/보정
    if PYDANTIC_V2:
        from pydantic import ConfigDict
        class LabelDocs(BaseModel):
            model_config = ConfigDict(extra="forbid")
            docs: List[str]
        return LabelDocs
    else:
        class LabelDocs(BaseModel):
            docs: List[str]
            class Config:
                extra = "forbid"
        return LabelDocs
def safe_response_dump(resp):
    if hasattr(resp, "model_dump"):
        return resp.model_dump()
    if hasattr(resp, "to_dict"):
        return resp.to_dict()
    try:
        return dict(resp)
    except Exception:
        return {"_raw": str(resp)}

def summarize_response(resp):
    d = safe_response_dump(resp)
    print("status:", d.get("status"))
    print("incomplete_details:", d.get("incomplete_details"))
    print("error:", d.get("error"))

    usage = d.get("usage") or {}
    print("usage:", usage)
    # usage 안에 output_tokens_details.reasoning_tokens가 찍히면 진짜 원인 확정

    outs = d.get("output") or []
    print("output_items:", len(outs))
    for i, o in enumerate(outs[:3]):
        print(f"  output[{i}].type =", o.get("type"))
        c = o.get("content") or []
        print(f"    content_items={len(c)}", [x.get("type") for x in c[:5]])

    ot = getattr(resp, "output_text", None)
    if isinstance(ot, str):
        print("output_text(head):", ot[:200])
    return d




openai: 2.13.0
pydantic: 2.12.5


In [None]:
def reset_run_state(reset_logs: bool = True):
    global progress
    progress = {
        "attempted_calls": 0,
        "api_ok_calls": 0,
        "parsed_ok_calls": 0,
        "valid_docs": 0,
        "invalid_docs": 0,
        "exceptions": 0,
    }
    with open(PROG_JSON, "w", encoding="utf-8") as f:
        json.dump(progress, f, ensure_ascii=False, indent=2)

    if reset_logs:
        # 로그 파일 비우기(주의: 기존 기록 삭제됨)
        for p in [RAW_JSONL, ERR_JSONL, OUT_CSV]:
            with open(p, "w", encoding="utf-8") as f:
                f.write("")
reset_run_state(reset_logs=True)



In [None]:
import os, json, time, random
from typing import Any, Dict, List, Optional

def write_jsonl(path: str, obj: dict):
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")
        f.flush()
        os.fsync(f.fileno())

def compact_resp_meta(resp) -> Dict[str, Any]:
    d = safe_response_dump(resp)
    return {
        "status": d.get("status"),
        "incomplete_details": d.get("incomplete_details"),
        "error": d.get("error"),
        "usage": d.get("usage"),
        "output_text": getattr(resp, "output_text", None),
        "output_types": [o.get("type") for o in (d.get("output") or [])],
    }

def try_manual_parse(TextFmt, resp):
    raw = extract_raw_text(resp)
    if not raw:
        return None, None
    raw2 = raw.replace("```json", "").replace("```", "").strip()
    try:
        if PYDANTIC_V2:
            return TextFmt.model_validate_json(raw2), raw2
        else:
            return TextFmt.parse_raw(raw2), raw2
    except Exception:
        return None, raw2

def reset_run_state(reset_logs: bool = True):
    global progress
    progress = {
        "attempted_calls": 0,
        "api_ok_calls": 0,
        "parsed_ok_calls": 0,
        "valid_docs": 0,
        "invalid_docs": 0,
        "exceptions": 0,
    }
    save_progress()
    if reset_logs:
        for p in [RAW_JSONL, ERR_JSONL, OUT_CSV]:
            with open(p, "w", encoding="utf-8") as f:
                f.write("")

def generate_and_log_docs(
    ctx_list: List[dict],
    labels: List[str],
    cids: List[int],
    *,
    base_max_output_tokens: int = 2048,
    max_retries: int = 4,
    reasoning_effort: str = "low",
):
    TextFmt = make_docs_text_format(len(ctx_list))

    system = (
        "You write precise Amazon marketplace product-category descriptions for taxonomy labels.\n"
        "ENGLISH ONLY.\n"
        "Return JSON that matches the provided schema.\n"
        "You will receive an array 'labels' with contexts. Produce docs in the SAME ORDER.\n"
        "\n"
        "Core rule:\n"
        "- Write a standalone description of the label itself only.\n"
        "- Do NOT reference or mention parent, children, siblings, hierarchy, or any provided keywords.\n"
        "- Even if those fields are present in the input, ignore them completely.\n"
        "\n"
        "Content guidance:\n"
        "- Describe what products sold on Amazon belong in this category, including typical item types, common variants, and typical use cases.\n"
        "- Include practical details that matter in customer reviews, such as fit, sizing, compatibility, durability, performance, materials, comfort, ease of use, and value.\n"
        "- Define clear boundaries by stating what is included and what is excluded, without naming related categories or using comparisons to parent or sibling labels.\n"
        "\n"
        "Hard formatting constraints for each doc:\n"
        "- Exactly 5 sentences.\n"
        "- Single paragraph with no newline characters.\n"
        "- Plain text only: no markdown, no bullets, no numbering.\n"
        "- 120 to 170 words total.\n"
        "- Use exactly five period characters '.' total: one at the end of each sentence.\n"
        "- Do NOT use any other periods anywhere (no abbreviations, no initials, no decimals).\n"
        "- Do NOT use semicolons or colons.\n"
    )

    user_obj = {"labels": ctx_list}

    last_err: Optional[Exception] = None

    for attempt in range(max_retries):
        mot = base_max_output_tokens * (2 ** attempt)

        progress["attempted_calls"] += 1
        save_progress()

        resp = None
        try:
            resp = client.responses.parse(
                model=MODEL,
                input=[
                    {"role": "system", "content": system},
                    {"role": "user", "content": json.dumps(user_obj, ensure_ascii=False)},
                ],
                text_format=TextFmt,
                max_output_tokens=mot,
                store=False,
                temperature=0.0,  # 선택(포맷 안정성에 도움)
            )
            progress["api_ok_calls"] += 1

            rec = {
                "ts": ts_utc(),
                "stage": "attempt",
                "attempt": attempt,
                "max_output_tokens": mot,
                "cids": cids,
                "labels": labels,
                "n_docs_expected": len(ctx_list),
            }
            rec.update(compact_resp_meta(resp))

            # 1) parse (structured)
            parsed = getattr(resp, "output_parsed", None)
            manual_parsed, manual_raw_json = (None, None)
            if parsed is None:
                manual_parsed, manual_raw_json = try_manual_parse(TextFmt, resp)
                parsed = manual_parsed
                if manual_raw_json is not None:
                    rec["raw_json"] = manual_raw_json  # output_text 말고 실제 파싱 대상도 같이 저장

            if parsed is None:
                rec["parse_ok"] = False
                rec["parse_error"] = "output_parsed is None"
                write_jsonl(RAW_JSONL, rec)
                last_err = RuntimeError(rec["parse_error"])
                save_progress()
                continue

            docs = list(parsed.docs)

            n_expected = len(ctx_list)
            docs = [str(x) for x in docs]  # 방어

            rec["docs_len_expected"] = n_expected
            rec["docs_len_got"] = len(docs)

            # 길이 보정: 부족하면 placeholder로 채우고, 넘치면 자르기
            if len(docs) < n_expected:
                docs = docs + ["__MISSING__"] * (n_expected - len(docs))
            elif len(docs) > n_expected:
                docs = docs[:n_expected]

            progress["parsed_ok_calls"] += 1

            # 2) validate
            val_list = []
            v_ok = 0
            v_bad = 0
            for d in docs:
                d2 = normalize_doc(d)
                ok, meta = is_valid_doc(d2)
                val_list.append({"valid": ok, **meta})
                if ok:
                    v_ok += 1
                else:
                    v_bad += 1
            progress["valid_docs"] += v_ok
            progress["invalid_docs"] += v_bad

            rec["parse_ok"] = True
            rec["docs"] = docs
            rec["validation"] = val_list
            rec["all_valid"] = (v_bad == 0)

            write_jsonl(RAW_JSONL, rec)
            save_progress()
            return docs, rec  # 성공 시 종료

        except Exception as e:
            progress["exceptions"] += 1
            save_progress()
            last_err = e

            err = {
                "ts": ts_utc(),
                "where": "generate_and_log_docs",
                "stage": "exception",
                "attempt": attempt,
                "max_output_tokens": mot,
                "cids": cids,
                "labels": labels,
                "error_type": type(e).__name__,
                "error": str(e),
            }
            if resp is not None:
                err.update(compact_resp_meta(resp))
            write_jsonl(ERR_JSONL, err)
            continue

    raise RuntimeError(f"failed after retries: {type(last_err).__name__}: {last_err}")


In [None]:
# (원하면) 같은 RUN_DIR에서 완전 초기화
reset_run_state(reset_logs=True)

test_cid = 0
ctx_list = [build_ctx(test_cid)]
docs, rec = generate_and_log_docs(
    ctx_list=ctx_list,
    labels=[id2label[str(test_cid)]],
    cids=[test_cid],
    base_max_output_tokens=2048,
    max_retries=4,
    reasoning_effort="low",   # 안정화 되면 "minimal"도 시도 가능
)

print("docs_len:", len(docs))
print("saved to:", RAW_JSONL)


docs_len: 1
saved to: /content/drive/MyDrive/BDA/Amazon_products/run_20251218T130946/raw_generations.jsonl


In [None]:
from typing import List, Dict, Any, Optional, Tuple


CSV_HEADER = [
    "cid", "label", "doc", "final_valid",
    "word_count", "sentence_count", "reasons",
    "from_attempt", "max_output_tokens", "reasoning_effort",
    "usage_total_tokens", "usage_output_tokens", "usage_reasoning_tokens",
    "status"
]

def ensure_csv_header(path: str):
    if not os.path.exists(path) or os.path.getsize(path) == 0:
        with open(path, "w", encoding="utf-8", newline="") as f:
            w = csv.writer(f)
            w.writerow(CSV_HEADER)

def append_csv_rows(path: str, rows: List[List[Any]]):
    with open(path, "a", encoding="utf-8", newline="") as f:
        w = csv.writer(f)
        w.writerows(rows)

ensure_csv_header(OUT_CSV)

# =========================
# 1) RAW_JSONL에서 완료 cid 로드 (resume)
# =========================
def load_done_cids_from_jsonl(path: str) -> set:
    done = set()
    if not os.path.exists(path):
        return done
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except Exception:
                continue

            # "합친 attempt 레코드"에서 parse_ok & all_valid인 것만 완료로 간주
            if obj.get("stage") == "attempt" and obj.get("parse_ok") is True and obj.get("all_valid") is True:
                cids = obj.get("cids") or []
                docs = obj.get("docs") or []
                if isinstance(cids, list) and isinstance(docs, list) and len(cids) == len(docs):
                    for cid in cids:
                        try:
                            done.add(int(cid))
                        except Exception:
                            pass
    return done

# =========================
# 2) 배치 유틸
# =========================
def chunks(lst: List[int], n: int):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

# =========================
# 3) 라벨 1개 단독 보정(규격 실패시)
# =========================
def regen_single(cid: int,
                 base_max_output_tokens: int,
                 max_retries: int,
                 reasoning_effort: str) -> Tuple[str, bool, Dict[str, Any], Dict[str, Any]]:
    ctx_list = [build_ctx(cid)]
    label = id2label[str(cid)]
    docs, rec = generate_and_log_docs(
        ctx_list=ctx_list,
        labels=[label],
        cids=[cid],
        base_max_output_tokens=base_max_output_tokens,
        max_retries=max_retries,
        reasoning_effort=reasoning_effort,
    )
    doc = normalize_doc(docs[0])
    ok, meta = is_valid_doc(doc)
    return doc, ok, meta, rec

In [None]:
import os, json
from datetime import datetime, timezone

# 호출 예산도 같이 초기화
CALL_BUDGET = 444

# progress 초기화 템플릿
def _fresh_progress():
    return {
        "attempted_calls": 0,   # 실제 API 호출 횟수(너 코드 기준)
        "api_ok_calls": 0,
        "parsed_ok_calls": 0,
        "valid_docs": 0,
        "invalid_docs": 0,
        "exceptions": 0,
    }

def init_new_run(run_root="/content/drive/MyDrive/BDA/Amazon_products", reset_logs=True):
    """
    새 RUN_DIR 생성 + 경로 변수/진행변수 초기화
    """
    global RUN_ID, RUN_DIR, OUT_CSV, RAW_JSONL, ERR_JSONL, PROG_JSON, progress

    RUN_ID = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    RUN_DIR = os.path.join(run_root, f"run_{RUN_ID}")
    os.makedirs(RUN_DIR, exist_ok=True)

    OUT_CSV   = os.path.join(RUN_DIR, "label_docs.csv")
    RAW_JSONL = os.path.join(RUN_DIR, "raw_generations.jsonl")
    ERR_JSONL = os.path.join(RUN_DIR, "errors.jsonl")
    PROG_JSON = os.path.join(RUN_DIR, "progress.json")

    progress = _fresh_progress()
    with open(PROG_JSON, "w", encoding="utf-8") as f:
        json.dump(progress, f, ensure_ascii=False, indent=2)

    # CSV 헤더 보장(네가 이미 ensure_csv_header 정의해둔 상태라고 가정)
    ensure_csv_header(OUT_CSV)

    # 로그 파일 비우기(새 폴더라 보통 필요 없지만, 안전빵)
    if reset_logs:
        with open(RAW_JSONL, "w", encoding="utf-8") as f: f.write("")
        with open(ERR_JSONL, "w", encoding="utf-8") as f: f.write("")

    print("RUN_DIR:", RUN_DIR)
    print("OUT_CSV:", OUT_CSV)
    print("RAW_JSONL:", RAW_JSONL)
    print("ERR_JSONL:", ERR_JSONL)
    print("PROG_JSON:", PROG_JSON)
    print("CALL_BUDGET:", CALL_BUDGET)

# 실행
init_new_run(reset_logs=True)


RUN_DIR: /content/drive/MyDrive/BDA/Amazon_products/run_20251218T131003Z
OUT_CSV: /content/drive/MyDrive/BDA/Amazon_products/run_20251218T131003Z/label_docs.csv
RAW_JSONL: /content/drive/MyDrive/BDA/Amazon_products/run_20251218T131003Z/raw_generations.jsonl
ERR_JSONL: /content/drive/MyDrive/BDA/Amazon_products/run_20251218T131003Z/errors.jsonl
PROG_JSON: /content/drive/MyDrive/BDA/Amazon_products/run_20251218T131003Z/progress.json
CALL_BUDGET: 444


In [None]:

def run_all_labels(
    *,
    start_cid: int = 0,
    end_cid: Optional[int] = None,
    batch_size: int = 1,
    resume: bool = True,
    base_max_output_tokens: int = 12000,
    max_retries: int = 4,
    reasoning_effort: str = "minimal",
    fix_reasoning_effort: str = "low",
    fix_base_max_output_tokens: int = 15000,
    fix_max_retries: int = 4,
    max_batches_this_run: Optional[int] = None,
    call_budget: int = CALL_BUDGET,   # ✅ 추가
):
    if end_cid is None:
        end_cid = num_class

    # resume 처리
    done = load_done_cids_from_jsonl(RAW_JSONL) if resume else set()
    pending = [cid for cid in range(start_cid, end_cid) if cid not in done]

    calls_start = progress.get("attempted_calls", 0)  # ✅ 시작 호출 수

    print(f"[RUN] total={end_cid-start_cid} done={len(done)} pending={len(pending)} batch_size={batch_size}")
    print(f"[BUDGET] used={calls_start}/{call_budget} remaining={max(0, call_budget-calls_start)}")

    if not pending:
        summary = {
            "calls_used_this_run": 0,
            "calls_total": calls_start,
            "calls_remaining": max(0, call_budget - calls_start),
            "final_ok": 0,
            "final_bad": 0,
            "stopped_reason": "nothing_to_do",
        }
        return summary

    total_final_ok = 0
    total_final_bad = 0
    batches_done = 0
    stopped_reason = "completed_all"

    for batch in chunks(pending, batch_size):
        if max_batches_this_run is not None and batches_done >= max_batches_this_run:
            stopped_reason = f"reached max_batches_this_run={max_batches_this_run}"
            break

        # ✅ 예산 체크(배치 시작 전에)
        if progress.get("attempted_calls", 0) >= call_budget:
            stopped_reason = "call_budget_exhausted_before_batch"
            break

        ctx_list = [build_ctx(cid) for cid in batch]
        labels = [id2label[str(cid)] for cid in batch]

        # 1차 생성
        try:
            docs, rec = generate_and_log_docs(
                ctx_list=ctx_list,
                labels=labels,
                cids=batch,
                base_max_output_tokens=base_max_output_tokens,
                max_retries=max_retries,
                reasoning_effort=reasoning_effort,
            )
        except Exception as e:
            print(f"[BATCH FAIL] cids={batch[:5]}.. ({len(batch)}) err={type(e).__name__}: {str(e)[:120]}")
            time.sleep(min(10.0, 1.0 + random.random() * 2.0))
            batches_done += 1
            # ✅ 예산 체크
            if progress.get("attempted_calls", 0) >= call_budget:
                stopped_reason = "call_budget_exhausted_after_batch_fail"
                break
            continue

        final_docs: Dict[int, Dict[str, Any]] = {}
        for i, cid in enumerate(batch):
            doc = normalize_doc(docs[i])
            ok, meta = is_valid_doc(doc)
            final_docs[cid] = {
                "cid": cid,
                "label": labels[i],
                "doc": doc,
                "valid": ok,
                "meta": meta,
                "rec": rec,
                "from_attempt": rec.get("attempt"),
                "max_output_tokens": rec.get("max_output_tokens"),
                "reasoning_effort": reasoning_effort,
            }

        # 규격 실패 cid만 단독 보정 (단, 예산 남아 있을 때만)
        bad_cids = [cid for cid in batch if not final_docs[cid]["valid"]]
        if bad_cids:
            cid0 = bad_cids[0]
            print("[SAMPLE INVALID]", cid0, final_docs[cid0]["meta"])
            print(final_docs[cid0]["doc"][:240])
            # ✅ 보정 들어가기 전에 예산 체크
            if progress.get("attempted_calls", 0) >= call_budget:
                stopped_reason = "call_budget_exhausted_before_fix"
                break

            print(f"[FIX] invalid={len(bad_cids)}/{len(batch)} -> single regen")
            for cid in bad_cids:
                if progress.get("attempted_calls", 0) >= call_budget:
                    stopped_reason = "call_budget_exhausted_mid_fix"
                    break
                try:
                    doc2, ok2, meta2, rec2 = regen_single(
                        cid,
                        base_max_output_tokens=fix_base_max_output_tokens,
                        max_retries=fix_max_retries,
                        reasoning_effort=fix_reasoning_effort,
                    )
                    final_docs[cid].update({
                        "doc": doc2,
                        "valid": ok2,
                        "meta": meta2,
                        "rec": rec2,
                        "from_attempt": rec2.get("attempt"),
                        "max_output_tokens": rec2.get("max_output_tokens"),
                        "reasoning_effort": fix_reasoning_effort,
                    })
                except Exception as e:
                    print(f"[FIX FAIL] cid={cid} err={type(e).__name__}: {str(e)[:120]}")
                    time.sleep(min(10.0, 1.0 + random.random() * 2.0))
            if stopped_reason.startswith("call_budget_exhausted"):
                break

        # CSV 저장
        rows = []
        for cid in batch:
            item = final_docs[cid]
            meta = item["meta"] or {}
            rec_used = item["rec"] or {}
            usage = (rec_used.get("usage") or {}) if isinstance(rec_used, dict) else {}

            out_tokens = usage.get("output_tokens")
            total_tokens = usage.get("total_tokens")
            od = usage.get("output_tokens_details") or {}
            reasoning_tokens = od.get("reasoning_tokens")

            rows.append([
                cid,
                item["label"],
                item["doc"],
                item["valid"],
                meta.get("word_count"),
                meta.get("sentence_count"),
                "|".join(meta.get("reasons") or []),
                item.get("from_attempt"),
                item.get("max_output_tokens"),
                item.get("reasoning_effort"),
                total_tokens,
                out_tokens,
                reasoning_tokens,
                rec_used.get("status"),
            ])

            if item["valid"]:
                total_final_ok += 1
            else:
                total_final_bad += 1

        append_csv_rows(OUT_CSV, rows)

        batches_done += 1
        if batches_done % 10 == 0:
            used_now = progress.get("attempted_calls", 0)
            print(f"[PROGRESS] batches={batches_done} ok={total_final_ok} bad={total_final_bad} calls_used={used_now}/{call_budget}")

        # ✅ 배치 끝날 때도 예산 체크
        if progress.get("attempted_calls", 0) >= call_budget:
            stopped_reason = "call_budget_exhausted_after_batch"
            break

    calls_end = progress.get("attempted_calls", 0)
    summary = {
        "batches_done": batches_done,
        "final_ok": total_final_ok,
        "final_bad": total_final_bad,
        "calls_used_this_run": calls_end - calls_start,
        "calls_total": calls_end,
        "calls_remaining": max(0, call_budget - calls_end),
        "call_budget": call_budget,
        "stopped_reason": stopped_reason,
        "OUT_CSV": OUT_CSV,
        "RAW_JSONL": RAW_JSONL,
        "ERR_JSONL": ERR_JSONL,
    }
    print("[SUMMARY]", summary)
    return summary


In [None]:
ensure_csv_header(OUT_CSV)

In [None]:
# reset_run_state(reset_logs=True)  # 새로 시작이면


summary = run_all_labels(
    start_cid=0,
    end_cid=num_class,
    batch_size=3,      # 444회 제한이면 4 추천(여유 호출로 fix 가능)
    resume=False,      # 새 run이면 False
    call_budget=444,
)

print("API calls used this run:", summary["calls_used_this_run"])
print("API calls total:", summary["calls_total"])
print("API calls remaining:", summary["calls_remaining"])
print("Stopped reason:", summary["stopped_reason"])


[RUN] total=531 done=0 pending=531 batch_size=3
[BUDGET] used=0/444 remaining=444
[SAMPLE INVALID] 11 {'word_count': 79, 'sentence_count': 5, 'reasons': ['word_count=79']}
Makeup products are designed to enhance facial features and create various looks, ranging from natural to dramatic. This category includes items such as foundation, lipstick, eyeshadow, and mascara, catering to diverse preferences and skin 
[FIX] invalid=1/3 -> single regen
[SAMPLE INVALID] 15 {'word_count': 77, 'sentence_count': 5, 'reasons': ['word_count=77']}
Action toy figures are dynamic collectibles that often represent characters from movies, comics, or video games. These figures typically feature articulated joints for posing and display, allowing for creative play and customization. Common
[FIX] invalid=1/3 -> single regen
[SAMPLE INVALID] 25 {'word_count': 72, 'sentence_count': 5, 'reasons': ['word_count=72']}
This category features products specifically designed to control body odor and manage perspiration