In [None]:
import os
os.environ['OPENAI_API_KEY'] = 'key'

In [None]:
import os
import csv
import random
import time
import json
from tqdm import tqdm
from openai import OpenAI
from collections import defaultdict

# ============================================
# 0. Paths & Globals
# ============================================

# 프로젝트 경로 설정 (실제 경로에 맞게 수정 필요)
ROOT = "/content/drive/MyDrive/project_release"
DATA_BASE = f"{ROOT}/Amazon_products"

TRAIN_CORPUS = f"{DATA_BASE}/train/train_corpus.txt"
CLASS_PATH   = f"{DATA_BASE}/classes.txt"

# 생성된 Strong Silver Label이 저장될 경로
# 3k 샘플을 위해 'train_strong_silver_labels_3k.csv'로 변경을 고려할 수 있습니다.
# 여기서는 2k 샘플을 3개씩 처리하는 것으로 가정하고 경로를 유지합니다.
STRONG_SILVER_PATH = "/content/train_strong_silver_labels_2k_s3.csv" # 파일명 변경으로 3 samples/call임을 명시

# LLM 설정
try:
    # 환경 변수에서 API 키를 가져오도록 수정하지 않고,
    # 실제 환경에서 설정된 client를 사용한다고 가정합니다.
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
    client = OpenAI(api_key=OPENAI_API_KEY)
    GPT_MODEL = "gpt-4o-mini" # 모델명은 환경에 따라 변경 가능
    if not OPENAI_API_KEY:
        client = None
        print("[경고] OPENAI_API_KEY 환경 변수가 설정되지 않아 API 호출이 작동하지 않습니다.")
except Exception:
    client = None

# ============================================
# 1. Loaders (기존과 동일)
# ============================================

def load_class_names(path):
    """ 클래스 이름과 ID 매핑을 로드합니다. """
    name2id = {}
    if not os.path.exists(path):
        print(f"Error: Class file not found at {path}")
        return name2id
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 2:
                cid = parts[0]
                cname = " ".join(parts[1:])
                name2id[cname] = int(cid)
    return name2id

def load_corpus(path):
    """ 코퍼스 전체를 로드합니다. (pid -> text) """
    pid2text = {}
    if not os.path.exists(path):
        print(f"Error: Corpus file not found at {path}")
        return pid2text
    with open(path, "r", encoding="utf-8") as f:
        # tqdm 대신 빠른 줄 수 세기
        num_lines = sum(1 for _ in open(path, "r", encoding="utf-8"))
        f.seek(0)
        for line in tqdm(f, total=num_lines, desc="Loading Train Corpus"):
            parts = line.strip().split("\t", 1)
            if len(parts) == 2:
                pid, txt = parts
                pid2text[pid] = txt
    return pid2text

# ============================================
# 2. LLM API 호출 및 라벨 생성 함수 (samples_per_call = 3으로 실행되도록 메인 블록 수정)
# ============================================

def generate_strong_labels_gpt(
    pid2text, name2id, max_samples=2000, max_labels_per_doc=3, samples_per_call=3
):
    """
    GPT API를 호출하여 Strong Silver Label을 생성하고 결과를 반환합니다.
    samples_per_call 만큼의 문서를 1 콜에 처리합니다.

    :param max_samples: 총 라벨링할 문서의 수
    :param samples_per_call: 1번의 API 호출로 처리할 문서의 수 (요청에 따라 3으로 설정됨)
    """
    if client is None:
        print("[ERROR] GPT 클라이언트가 유효하지 않습니다. 라벨 생성을 건너뜁니다.")
        return {}

    all_pids = list(pid2text.keys())
    random.seed(42)
    random.shuffle(all_pids)

    # 처리할 PID 리스트 (2000개)
    sample_pids = all_pids[:min(max_samples, len(all_pids))]
    num_classes = max(name2id.values()) + 1 if name2id else 1

    id_to_name = {v: k for k, v in name2id.items()}
    class_names = {str(k): v for k, v in id_to_name.items()}

    llm_labels = {}

    # 프롬프트 정의
    SYSTEM_PROMPT = (
        "You are an Amazon product classification expert. Your task is to assign the top 3 most relevant "
        "class IDs to the provided product reviews. "
        "The class IDs and names are: " + json.dumps(class_names) + ". "
        f"Respond ONLY with a single JSON object. The keys must be the product PIDs, and the values must be a list of "
        f"the predicted class IDs (integers, maximum {max_labels_per_doc}). "
        "Example: {\"pid_12345\": [101, 5, 20], \"pid_67890\": [40, 5, 12]}"
    )

    # 샘플을 samples_per_call 개씩 그룹화
    pid_groups = [
        sample_pids[i:i + samples_per_call]
        for i in range(0, len(sample_pids), samples_per_call)
    ]

    for group_pids in tqdm(pid_groups, desc=f"Calling GPT ({samples_per_call} samples/call) for {len(sample_pids)} documents"):

        prompt_data = {pid: pid2text[pid] for pid in group_pids}
        USER_PROMPT = "Classify the following product descriptions: \n" + json.dumps(prompt_data)

        try:
            response = client.chat.completions.create(
                model=GPT_MODEL,
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": USER_PROMPT},
                ],
                #temperature=0.0
            )

            content = response.choices[0].message.content.strip()

            try:
                predicted_labels_batch = json.loads(content)
                # 그룹 내의 모든 PID에 대해 결과 처리
                for pid in group_pids:
                    if pid in predicted_labels_batch and isinstance(predicted_labels_batch[pid], list):
                        raw_labels = predicted_labels_batch[pid]
                        # 정수로 변환 가능하고, 클래스 범위 내에 있는 유효한 라벨만 필터링
                        valid_labels = [
                            int(l) for l in raw_labels
                            if (isinstance(l, (int, str)) and str(l).isdigit() and 0 <= int(l) < num_classes)
                        ]
                        # 상위 max_labels_per_doc 개만 유지
                        llm_labels[pid] = valid_labels[:max_labels_per_doc] if valid_labels else [0]
                    else:
                        # 응답 JSON에 특정 PID가 없거나 형식이 잘못된 경우
                        llm_labels[pid] = [0]
                        # print(f"\n[Warning] PID {pid}에 대한 라벨을 찾을 수 없거나 형식이 잘못되었습니다.")

            except json.JSONDecodeError:
                # JSON 파싱 실패 시 그룹 내 모든 PID를 라벨 0으로 처리
                for pid in group_pids:
                    llm_labels[pid] = [0]
                # print(f"\n[Warning] JSON 파싱 오류. Raw response: {content[:50]}...")

        except Exception as e:
            # API 호출 자체 실패 시 그룹 내 모든 PID를 라벨 0으로 처리
            print(f"\n[Error] API 호출 실패: {e}")
            for pid in group_pids:
                llm_labels[pid] = [0]
            time.sleep(3) # API 제한을 피하기 위해 잠시 대기

    return llm_labels

# ============================================
# 3. Main Execution Block for Cell 1 (수정된 파라미터)
# ============================================

print("=== CELL 1: GPT Strong Silver Label Generation ===")

# 1. 데이터 로드
name2id = load_class_names(CLASS_PATH)
pid2text = load_corpus(TRAIN_CORPUS)

# 2. GPT 라벨링 실행
# max_samples=2000: 총 2000개의 문서를 라벨링
# max_labels_per_doc=3: 문서당 최대 3개의 라벨
# samples_per_call=3: 1번의 API 호출당 3개의 문서 처리 (총 호출 횟수: 2000 / 3 약 667회)
strong_labels_map = generate_strong_labels_gpt(
    pid2text, name2id, max_samples=2000, max_labels_per_doc=3, samples_per_call=3
)

# 3. CSV 파일로 저장
print(f"\n[Save] Saving Strong Silver Labels to {STRONG_SILVER_PATH}...")
with open(STRONG_SILVER_PATH, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(["pid", "labels"])
    for pid, labs in tqdm(strong_labels_map.items(), desc="Writing CSV"):
        w.writerow([pid, ",".join(map(str, labs))])

print(f"\n✅ Cell 1 완료. Strong Silver Labels ({len(strong_labels_map)}개) 저장 완료. (파일: {STRONG_SILVER_PATH})")

In [None]:
import os
import csv
import random
import time
import json
from tqdm import tqdm
from openai import OpenAI
from collections import defaultdict
###description
# ============================================
# 0. Paths & Globals
# ============================================

# 프로젝트 경로 설정 (실제 경로에 맞게 수정 필요)
ROOT = "/content/drive/MyDrive/project_release"
DATA_BASE = f"{ROOT}/Amazon_products"

CLASS_PATH   = f"{DATA_BASE}/classes.txt"

# 생성된 클래스 설명이 저장될 경로 (새로운 경로)
CLASS_DESCRIPTION_PATH = "/content/gpt_class_descriptions_100calls.json"

# LLM 설정
try:
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
    client = OpenAI(api_key=OPENAI_API_KEY)
    GPT_MODEL = "gpt-4o-mini"
    if not OPENAI_API_KEY:
        client = None
        print("[경고] OPENAI_API_KEY 환경 변수가 설정되지 않아 API 호출이 작동하지 않습니다.")
except Exception:
    client = None

# LLM 호출 제한 설정
MAX_CALLS = 100
CLASSES_PER_CALL = 6 # 1회 호출당 처리할 클래스 수 (531 클래스를 100회 내에 처리하기 위해)

# ============================================
# 1. Loaders (기존과 동일)
# ============================================

def load_class_names(path):
    """ 클래스 이름과 ID 매핑을 로드합니다. (ID -> Name 반환) """
    id_to_name = {}
    if not os.path.exists(path):
        print(f"Error: Class file not found at {path}")
        return id_to_name
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 2:
                cid = parts[0]
                cname = " ".join(parts[1:])
                # ID (정수) -> Name (문자열)으로 저장
                id_to_name[int(cid)] = cname
    return id_to_name

# 코퍼스 로딩 함수는 이 태스크에서는 필요 없으므로 제거했습니다.

# ============================================
# 2. LLM API 호출 및 클래스 설명 생성 함수
# ============================================

def generate_class_descriptions_gpt(id_to_name, classes_per_call):
    """
    GPT API를 호출하여 클래스 설명을 생성합니다.
    """
    if client is None:
        print("[ERROR] GPT 클라이언트가 유효하지 않습니다. 설명 생성을 건너뜁니다.")
        return {}

    all_cids = sorted(id_to_name.keys())
    num_classes = len(all_cids)

    # 100회 호출 제한을 위해 실제 호출할 그룹만 계산
    num_calls_needed = (num_classes + classes_per_call - 1) // classes_per_call
    num_calls_to_make = min(num_calls_needed, MAX_CALLS)

    print(f"[INFO] 총 클래스 수: {num_classes}. 호출당 클래스: {classes_per_call}. 예상 호출 횟수: {num_calls_to_make}회.")

    # 처리할 클래스 ID 리스트
    sample_cids = all_cids

    # 생성된 설명을 저장할 딕셔너리
    generated_descriptions = {}

    # 프롬프트 정의
    SYSTEM_PROMPT = (
        "You are an expert in product taxonomy. Your task is to provide a concise, distinct, and objective definition "
        "or description (under 15 words) for each class ID provided. The description should highlight the core purpose or characteristics "
        "of the product category, making it easily distinguishable from others in the Amazon product hierarchy. "
        "Respond ONLY with a single JSON object. The keys must be the class IDs (integers), and the values must be the generated description string. "
        "Example: {\"101\": \"Small, handheld electronic devices for timekeeping.\", \"5\": \"Sweetened carbonated water, often flavored and colored.\"}"
    )

    # 샘플을 classes_per_call 개씩 그룹화
    cid_groups = [
        sample_cids[i:i + classes_per_call]
        for i in range(0, len(sample_cids), classes_per_call)
    ][:num_calls_to_make] # 최대 100회 호출까지만 사용

    for group_cids in tqdm(cid_groups, desc=f"Calling GPT ({classes_per_call} classes/call)"):

        # 그룹 내 클래스 ID와 이름 준비
        prompt_data = {
            cid: id_to_name[cid]
            for cid in group_cids
        }

        USER_PROMPT = "Generate descriptions for the following classes (ID: Class Name): \n" + json.dumps(prompt_data)

        try:
            response = client.chat.completions.create(
                model=GPT_MODEL,
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": USER_PROMPT},
                ],
                temperature=0.0 # 객관적인 정의를 위해 낮은 온도 사용
            )

            content = response.choices[0].message.content.strip()

            try:
                # 응답 JSON 파싱
                predicted_descriptions = json.loads(content)

                for cid in group_cids:
                    cid_str = str(cid)
                    if cid_str in predicted_descriptions and isinstance(predicted_descriptions[cid_str], str):
                        # 생성된 설명을 저장
                        generated_descriptions[cid] = predicted_descriptions[cid_str]
                    else:
                        # 응답 JSON에 특정 CID가 없거나 형식이 잘못된 경우, 클래스 이름으로 대체
                        generated_descriptions[cid] = id_to_name[cid]

            except json.JSONDecodeError:
                # JSON 파싱 실패 시, 그룹 내 모든 클래스는 클래스 이름으로 대체
                for cid in group_cids:
                    generated_descriptions[cid] = id_to_name[cid]
                # print(f"\n[Warning] JSON 파싱 오류. Raw response: {content[:50]}...")

            time.sleep(0.5)

        except Exception as e:
            # API 호출 자체 실패 시, 그룹 내 모든 클래스는 클래스 이름으로 대체
            print(f"\n[Error] API 호출 실패: {e}. 클래스 ID {group_cids[0]}부터 대체.")
            for cid in group_cids:
                generated_descriptions[cid] = id_to_name[cid]
            time.sleep(3)

    return generated_descriptions

# ============================================
# 3. Main Execution Block
# ============================================

print("=== CELL 1: GPT Class Description Generation (100 Call Limit) ===")

# 1. 데이터 로드 (ID -> Name 매핑)
id_to_name = load_class_names(CLASS_PATH)

# 2. GPT 설명 생성 실행
generated_descriptions_map = generate_class_descriptions_gpt(
    id_to_name,
    classes_per_call=CLASSES_PER_CALL
)

# 3. JSON 파일로 저장
print(f"\n[Save] Saving Class Descriptions to {CLASS_DESCRIPTION_PATH}...")

# {ID (int): Description (str)} 형태로 저장
with open(CLASS_DESCRIPTION_PATH, "w", encoding="utf-8") as f:
    json.dump(generated_descriptions_map, f, indent=4, ensure_ascii=False)

# 누락된 클래스 확인
num_total = len(id_to_name)
num_generated = len(generated_descriptions_map)
num_missing = num_total - num_generated

print(f"\n✅ Cell 1 완료. 총 클래스 수: {num_total}, 설명 생성 클래스 수: {num_generated}, 누락 수: {num_missing}")
print(f"(파일: {CLASS_DESCRIPTION_PATH})")