In [2]:
import os
from getpass import getpass

if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass("Please enter your OpenAI API Key: ")

Please enter your OpenAI API Key:  ········


In [3]:
import os, json, time, uuid, random
from pathlib import Path
from openai import OpenAI

client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

def get_response(prompt: str, temperature: float = 1.0, model: str = "gpt-4o", max_tokens: int = 256) -> str:
    chat_completion = client.chat.completions.create(
        messages=[{"role":"system","content":""},{"role":"user","content":prompt}],
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
    )
    return chat_completion.choices[0].message.content.strip()

In [4]:
def load_json(path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def save_jsonl(path, rows):
    with open(path, "w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

def load_template(path, default_text):
    p = Path(path)
    if p.exists():
        return p.read_text(encoding="utf-8")
    return default_text

def parse_winner(text):
    t = "".join(ch for ch in text if ch in "12")
    if "1" in t and "2" not in t:
        return 1
    if "2" in t and "1" not in t:
        return 2
    return None

def backoff_call(fn, *args, retries=5, base=1.0, **kwargs):
    for i in range(retries):
        try:
            return fn(*args, **kwargs)
        except Exception as e:
            if i == retries - 1:
                raise e
            time.sleep(base * (2 ** i) + random.random())

In [5]:
def make_round(scores, seen, rng):
    ids = list(scores.keys())
    bucket = {}
    for i in ids:
        bucket.setdefault(scores[i], []).append(i)
    pairs = []
    used = set()
    for s in sorted(bucket.keys(), reverse=True):
        group = bucket[s][:]
        rng.shuffle(group)
        j = 0
        while j < len(group) - 1:
            a, b = group[j], group[j+1]
            key = tuple(sorted((a, b)))
            if key in seen:
                k = j + 2
                swapped = False
                while k < len(group):
                    c = group[k]
                    key2 = tuple(sorted((a, c)))
                    if key2 not in seen and c not in used:
                        group[j+1], group[k] = group[k], group[j+1]
                        b = group[j+1]
                        swapped = True
                        break
                    k += 1
                if not swapped:
                    j += 1
                    continue
            if a not in used and b not in used:
                used.add(a); used.add(b)
                pairs.append((a, b))
                seen.add(tuple(sorted((a, b))))
            j += 2
    leftovers = [i for i in ids if i not in used]
    while len(leftovers) >= 2:
        a = leftovers.pop(0); b = leftovers.pop(0)
        key = tuple(sorted((a, b)))
        if key not in seen:
            pairs.append((a, b)); seen.add(key)
    return pairs

In [7]:
DATA_DIR = Path("../data")
TEMPLATE_PATH = Path("../templates/comparison_prompt.txt")
RESPONSES_PATH = DATA_DIR / "responses.json"
OUT_PATH = DATA_DIR / "pairwise.jsonl"

MODEL = os.environ.get("MODEL", "gpt-4o")
TEMPERATURE = float(os.environ.get("TEMPERATURE", "1.0"))
ROUNDS = int(os.environ.get("ROUNDS", "3"))
SEED = int(os.environ.get("SEED", "42"))
SLEEP = float(os.environ.get("SLEEP", "0"))

ATTRIBUTE = "Formality"

rng = random.Random(SEED)

default_template = (
    "You are an expert judge. Your task is to compare two text responses based on the attribute of '{attribute}'.\n\n"
    "Response 1: \"{response_1}\"\n"
    "Response 2: \"{response_2}\"\n\n"
    "Which response expresses a higher degree of '{attribute}'? Respond with only 1 or 2."
)
template = load_template(TEMPLATE_PATH, default_template)

items = load_json(RESPONSES_PATH)
if len(items) < 2:
    save_jsonl(OUT_PATH, [])
    print(str(OUT_PATH))
else:
    scores = {it["id"]: 0 for it in items}
    seen = set()
    rows = []
    for r in range(ROUNDS):
        pairs = make_round(scores, seen, rng)
        if not pairs:
            break
        for a, b in pairs:
            left = next(x for x in items if x["id"] == a)
            right = next(x for x in items if x["id"] == b)
            prompt = template.format(attribute=ATTRIBUTE, response_1=left["text"], response_2=right["text"])
            txt = backoff_call(get_response, prompt, temperature=TEMPERATURE, model=MODEL, max_tokens=8)
            winner = parse_winner(txt)
            if winner is None:
                continue
            if winner == 1:
                scores[a] += 1
            else:
                scores[b] += 1
            rows.append({
                "id": uuid.uuid4().hex,
                "left_id": a,
                "right_id": b,
                "winner": int(winner),
                "model": MODEL,
                "temperature": TEMPERATURE,
                "attribute": ATTRIBUTE,
                "raw": txt,
                "ts": int(time.time())
            })
            if SLEEP > 0:
                time.sleep(SLEEP)
    OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
    save_jsonl(OUT_PATH, rows)
    print(str(OUT_PATH))

../data/pairwise.jsonl
