In [None]:
# %% Minimal setup
# If needed (uncomment in a notebook):
# !pip install requests python-dotenv

import os, json, textwrap, re, time
import requests

API_KEY  = os.getenv("OPENAI_API_KEY", "cse476")
API_BASE = os.getenv("API_BASE", "http://10.4.58.53:41701/v1")  
MODEL    = os.getenv("MODEL_NAME", "bens_model")              

def call_model_chat_completions(prompt: str,
                                system: str = "You are a helpful assistant. Reply with only the final answer‚Äîno explanation.",
                                model: str = MODEL,
                                temperature: float = 0.3,
                                timeout: int = 60,
                                max_tokens: int = 128) -> dict:
    """
    Calls an OpenAI-style /v1/chat/completions endpoint and returns:
    { 'ok': bool, 'text': str or None, 'raw': dict or None, 'status': int, 'error': str or None, 'headers': dict }
    """
    url = f"{API_BASE}/chat/completions"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type":  "application/json",
    }
    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": system},
            {"role": "user",   "content": prompt}
        ],
        "temperature": temperature,
        "max_tokens": max_tokens,
    }

    try:
        #{'id': 'chatcmpl-88b6d7e18a5542b5bed5bf2828f0661e', 'object': 'chat.completion', 'created': 1763204718, 'model': 'bens_model', 'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': 'US Highway 281', 'refusal': None, 'annotations': None, 'audio': None, 'function_call': None, 'tool_calls': [], 'reasoning_content': None}, 'logprobs': None, 'finish_reason': 'stop', 'stop_reason': None, 'token_ids': None}], 'service_tier': None, 'system_fingerprint': None, 'usage': {'prompt_tokens': 50, 'total_tokens': 57, 'completion_tokens': 7, 'prompt_tokens_details': None}, 'prompt_logprobs': None, 'prompt_token_ids': None, 'kv_transfer_params': None}
        resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
        status = resp.status_code
        hdrs   = dict(resp.headers)
        if status == 200:
            data = resp.json()
            #print(data)
            text = data.get("choices", [{}])[0].get("message", {}).get("content", "")
            tokens_used = data.get("usage",[{}]).get("completion_tokens", {})
            #print('used tokens:', tokens_used)
            
            return {"ok": True, "text": text, "raw": data, "status": status, "error": None, "headers": hdrs, "tokens_used":tokens_used}
        else:
            # try best-effort to surface error text
            err_text = None
            try:
                err_text = resp.json()
            except Exception:
                err_text = resp.text
            return {"ok": False, "text": None, "raw": None, "status": status, "error": str(err_text), "headers": hdrs}
    except requests.RequestException as e:
        return {"ok": False, "text": None, "raw": None, "status": -1, "error": str(e), "headers": {}}


In [107]:
from IPython.display import Markdown, display

In [None]:
# %% Direct call example
def direct_call(prompt="What is 17 + 28? Answer with just the number.", temperature=0.2, max_tokens=128):
    demo_prompt = prompt
    result = call_model_chat_completions(demo_prompt, temperature=temperature, max_tokens=max_tokens)
    print("OK:", result["ok"], "HTTP:", result["status"])
    print("MODEL SAYS:", (result["text"] or "").strip())

    # Optional: Inspect rate-limit headers if your provider exposes them
    for k in ["x-ratelimit-remaining-requests", "x-ratelimit-limit-requests", "x-request-id"]:
        if k in result["headers"]:
            print(f"{k}: {result['headers'][k]}")


In [90]:
# %% Define three tests: input + expected
my_tests = [
    {
        "id": "math_inequality",
        "type": "numeric",  # grader will prefer numeric extraction
        "prompt": "Solve for the smallest integer n such that 3n + 5 > 26. Answer with just the integer.",
        "expected": "8",    # Because 3n > 21 => n > 7, smallest integer is 8
    },
    {
        "id": "commonsense_ice",
        "type": "text",
        "prompt": (
            "You place an ice cube in a glass of water and mark the water level. "
            "After the ice melts, does the water level rise, fall, or stay the same? "
            "Answer with exactly one of: 'rise', 'fall', 'stay the same'."
        ),
        "expected": "stay the same",
    },
    {
        "id": "logic_race",
        "type": "text",
        "prompt": (
            "In a race, you pass the person in second place. What position are you now in? "
            "Answer with a single word like 'first', 'second', 'third'."
        ),
        "expected": "second",
    },
]


In [382]:
import json
from pprint import pprint
import random
from collections import Counter

POSSIBLE_TYPES = ['math', 'common_sense', 'planning', 'coding', 'future_prediction']

def load_save_json(path_in="parsed_dev_data.json", path_out=None, data_in=None, clear=False):
    data = json.load(open(path_in, "r", encoding="utf-8")) if not clear else []
    if path_out is not None:
        data.append(data_in)
        with open(path_out, "w") as f:
            json.dump(data, f, indent=4)
            
    return data
            
all_tests = load_save_json()

type_counts = Counter(t['domain'] for t in all_tests)
print(type_counts)

formatted_tests = []
for i, t in enumerate(all_tests, start=1):
    
    formatted_tests.append({
        "id": t['id'], # domain_domainIndex_domainTestIndex_testIndex
        "type": t['domain'],
        "prompt": t['input'],
        "expected": t['output'],
        "char_count": t['input_char_count'],
        "exp_word_count": t['exp_word_count']
    })
    
all_tests = formatted_tests

Counter({'common_sense': 400, 'math': 300, 'coding': 100, 'future_prediction': 100, 'planning': 100})


In [391]:
def print_json(test):
    print(json.dumps(test, indent=2, ensure_ascii=False))

#pass test_type as a list of types
#generalized get test function
def get_tests(n=0, test_type=POSSIBLE_TYPES, start=0, end=None, lower_char=0, upper_char=float('inf'), lower_exp=0, upper_exp=float('inf'), seed=None, index=None):
    if index is not None: return all_tests[index]
    
    filtered_tests = [t for t in all_tests if t['type'] in test_type and lower_char <= t['char_count'] <= upper_char and lower_exp <= t['exp_word_count'] <= upper_exp]
    print('filtered size:', len(filtered_tests))
    sample_size = min(n, len(filtered_tests))
    
    if seed is not None: random.seed(seed)
    
    if n == 0:
        return filtered_tests[start:end]
    elif n == -1:
        filtered_type_counts = Counter(t['type'] for t in filtered_tests)
        each_test = []
        count = 0
        
        for val in filtered_type_counts.values():
            rand = random.randint(count, count + val)
            count = count + val
            each_test.append(filtered_tests[rand])
            
        print("sampled size:", len(each_test))    
        return each_test
    else:
        return random.sample(filtered_tests, sample_size)
    
""" def get_test_type(test_type, start=0, end=None, lower=0, upper=float('inf')):
    tests = [t for t in all_tests if t['type'] in test_type and lower <= t['char_count'] <= upper]
    return tests[start:end]

def get_random_tests(n=5, lower=0, upper=float('inf'), test_type=POSSIBLE_TYPES):
    filtered_tests = get_test_type(test_type=test_type, lower=lower, upper=upper) #[t for t in all_tests if lower <= t['char_count'] <= upper]
    sample_size = min(n, len(filtered_tests)) #prevent error
    return random.sample(filtered_tests, sample_size) """

" def get_test_type(test_type, start=0, end=None, lower=0, upper=float('inf')):\n    tests = [t for t in all_tests if t['type'] in test_type and lower <= t['char_count'] <= upper]\n    return tests[start:end]\n\ndef get_random_tests(n=5, lower=0, upper=float('inf'), test_type=POSSIBLE_TYPES):\n    filtered_tests = get_test_type(test_type=test_type, lower=lower, upper=upper) #[t for t in all_tests if lower <= t['char_count'] <= upper]\n    sample_size = min(n, len(filtered_tests)) #prevent error\n    return random.sample(filtered_tests, sample_size) "

In [287]:
tests = get_tests(n=1, upper_char=300) #get_test_type('math', end=10, lower=0, upper=500)
pprint(tests)

filtered size: 440
['First',
 'figure',
 'out',
 'how',
 'many',
 'square',
 'feet',
 'the',
 'original',
 'bolt',
 'of',
 'fabric',
 'was:',
 '16',
 'feet',
 '*',
 '12',
 'feet',
 '=',
 '<<16*12=192>>192',
 'square',
 'feet',
 'Then',
 'figure',
 'out',
 'how',
 'much',
 'fabric',
 'Ann',
 'took',
 'for',
 'the',
 'living',
 'room',
 'curtains:',
 '4',
 'feet',
 '*',
 '6',
 'feet',
 '=',
 '<<4*6=24>>24',
 'square',
 'feet',
 'Then',
 'figure',
 'out',
 'how',
 'much',
 'fabric',
 'Ann',
 'took',
 'for',
 'the',
 'bathroom',
 'curtains:',
 '2',
 'feet',
 '*',
 '4',
 'feet',
 '=',
 '<<2*4=8>>8',
 'square',
 'feet',
 'Finally,',
 'subtract',
 'the',
 'square',
 'footage',
 'of',
 'both',
 'sets',
 'of',
 'curtains',
 'from',
 'the',
 'total',
 'square',
 'footage:',
 '192',
 '-',
 '24',
 '-',
 '8',
 '=',
 '<<192-24-8=160>>160',
 'square',
 'feet',
 '####',
 '160']


In [94]:
#simple hello world call to kick off the commits
#direct_call(prompt="how do I find the derivative of y=x^2 using python?")

In [95]:
def interactive_chat():
    messages = ["<Start of message history>"]
    count = 0
    while True:
        user_input = input("You: ")
        if user_input.lower() in ['exit', 'quit']:
            print("Exiting chat.")
            break
        response = call_model_chat_completions(prompt=f"Old messages{messages}, CURRENT USER INPUT:{user_input} <--- ANSWER THIS QUESTION", temperature=0.7)
        count += 1
        messages.append(f"MESSAGE_{count}_[previous user input: {user_input}, previous system response: {response['text']}]")
        if response["ok"]:
            print("Model:", response["text"].strip())
        else:
            print("Error:", response["error"])
        print(messages)
#interactive_chat()

In [96]:
""" def execute_tests():
    rows = []
    for t in tests:
        r = call_model_chat_completions(
            prompt,
            system=system,
            model=model,
            temperature=0.3,
            max_tokens=128
        ) """

' def execute_tests():\n    rows = []\n    for t in tests:\n        r = call_model_chat_completions(\n            prompt,\n            system=system,\n            model=model,\n            temperature=0.3,\n            max_tokens=128\n        ) '

In [198]:
def self_evaluate(question, prediction, expected_answer, model=MODEL):
    """
    Use the model itself as a strict grader.
    Returns True if the model says the prediction matches the expected answer; else False.
    Falls back to a simple normalized string compare if the model's reply is malformed.
    """
    import re

    system = "You are a strict grader. Reply with exactly True or False. No punctuation. No explanation."
    prompt = f"""You are grading a question-answer pair.

Return exactly True if the PREDICTION would be accepted as correct for the EXPECTED_ANSWER.
Otherwise, return False.

QUESTION:
{question}

PREDICTION:
{prediction}

EXPECTED_ANSWER:
{expected_answer}

Answer with exactly: True or False
"""

    r = call_model_chat_completions(
        prompt,
        system=system,
        model=model,
        temperature=0.3,
    )

    reply = (r.get("text") or "").strip().lower()
    if reply.startswith("true"):
        return True
    if reply.startswith("false"):
        return False

    # No Fallback yet


In [272]:
def self_evaluate2(question, model_output, prediction, expected_answer, model=MODEL):
    """
    Use the model itself as a strict grader.
    Returns True if the model says the prediction matches the expected answer; else False.
    Falls back to a simple normalized string compare if the model's reply is malformed.
    """
    import re

    system = "You are a strict grader. Reply with exactly Yes or No. No punctuation. No explanation."
    prompt = f"""MODEL_1 thinks this ANSWER is {prediction}, do you agree with MODEL_1 decision?

QUESTION:
{question}

ANSWER:
{model_output}

EXPECTED_ANSWER:
{expected_answer}

-----------------------
MODEL_1 OUTPUT:
{prediction}
-----------------------

Answer with exactly: Yes or No. Do you agree with MODEL_1?
"""

    r = call_model_chat_completions(
        prompt,
        system=system,
        model=model,
        temperature=0.3,
    )

    reply = (r.get("text") or "").strip().lower()
    if reply.startswith("true") or reply.startswith("yes"):
        return True
    if reply.startswith("false") or reply.startswith("no"):
        return False

    # No Fallback yet


In [317]:
tf_map = {'yes':'true', 'no':'false'}
def map_tf(output, exp):
    exp = str(exp)
    exp = exp.lower().strip('.')
    out = output.lower().strip('.')
    
    #rare case when exp is actually yes/now and model output is true/false
    if exp == "yes" and out == "true": return "yes"
    if exp == "no" and out == "false": return "no"
    
    return tf_map.get(out) if out in tf_map else output

In [303]:

def basic_match_check(test, output):
    exp = test["expected"]
    
    output = map_tf(output, exp)
    
    matches = re.findall(re.escape(str(exp)), output, re.IGNORECASE)
    
    num_matches = len(matches)
    if num_matches > 0:
        #print('MATCH(ES) FOUND:', matches)
        return True
    
    #print('NO MATCH FOUND')
    return False

In [279]:
def seperator(text, tokens_used=None, max_tokens=400):
    if tokens_used is not None:
        print(f'{text} (TOKENS USED: {tokens_used}/{max_tokens})')
        if int(tokens_used) == max_tokens:
            print('MAXED TOKENS REACHED - OUTPUT TRUNCATED')
            return False
    else:
        print(text)
    print('-'*32)
    
    return True

In [273]:
def check_correct(bool1, bool2):
    correctness = bool1 and bool2
    agreement = bool1 == bool2
    
    print('‚úÖ CORRECT') if correctness else print('‚ùå INCORRECT')
    print('üÜó AGREED') if agreement else print('üÜò DISAGREED')
    
    return correctness, agreement

In [373]:
import math

def create_matches(toCount, toMatch):
    counter = Counter(toCount)
    match_counts = {word: counter.get(word, 0) for word in toMatch}
    total_matches = sum(match_counts.values())
    output_len = len(toCount)
    #print(f"{total_matches}/{output_len} : {(total_matches / output_len) * 100 if output_len != 0 else 0}%")
    #print('match counts:', match_counts)
    return total_matches, output_len

def get_cosine(expected_counter, output_counter):
    dot_product = sum(expected_counter[word] * output_counter.get(word, 0) for word in expected_counter)
    
    #print(f"Dot product: {dot_product}")
    
    exp_mag = math.sqrt(sum(v**2 for v in expected_counter.values()))
    out_mag = math.sqrt(sum(v**2 for v in output_counter.values()))
    
    cosine_sim = 0
    if exp_mag > 0 and out_mag > 0:
        cosine_sim = dot_product / (exp_mag * out_mag)
        print(f"\n[Cosine similarity: {cosine_sim}]")
        
    return cosine_sim

def get_start_end_matches(expected, output, exp_len, out_len):
    start_matches = False
    end_matches = False
    if expected[0] in output[0]: start_matches = True
    if expected[exp_len-1] in output[out_len-1]: end_matches = True
    #print('exp', expected)
    #print('output', output)
    
    #print(f"expected[0] {expected[0]}, output[0] {output[0]}")
    #print(f"expected[exp_len-1] {expected[exp_len-1]}, output[out_len-1] {output[out_len-1]}")
    #print(f"START {start_matches} END {end_matches}")
    
    return start_matches, end_matches
    
def super_match(test, output):
    expected = str(test["expected"]).replace('$', '').lower().split()
    output = output.replace('$', '').lower().split()
    
    expected_counter = Counter(expected)
    output_counter = Counter(output)
    
    #not very helpful in the long run...
    get_cosine(expected_counter, output_counter)
    
    exp_matches, out_len = create_matches(output, expected)
    out_matches, exp_len = create_matches(expected, output)
    
    return get_start_end_matches(expected, output, exp_len, out_len)
    #return match_counts

In [388]:
def self_evaluate_tests(tests, model=MODEL, grader_model=None, sleep_sec=0.2, verbose=True):
    """
    Run the tests by querying the model for each prompt, then use LLM-as-a-judge
    (self_evaluate) to determine correctness.

    Args:
        tests: list of dicts with keys: id, prompt, expected (and optionally type)
        model: model used to generate predictions
        grader_model: model used to judge correctness (defaults to `model` if None)
        sleep_sec: small delay between calls to be polite to the API
        verbose: if True, print a summary line per test

    Returns:
        rows: list of dicts with fields:
              id, expected, got, correct, status, error
    """
    import time

    judge_model = grader_model or model
    MAX_TOKENS = 400
    final_answers = []
    count = 0
    test_samples = {
        "count": len(tests),
        "seed": None,
        "samples": None
    }
    
    for t in tests:
        sample = {
            "test_count": count,
            "id": t["id"],
            "input": t['prompt'],
            "expected": t["expected"],
            "got": None,
            "history": {
                "check_correct1": {
                    "match_check": None,
                    "self_eval": None,
                    "correctness": None,
                    "agreement": None
                },
                "truncated": False,
                "check_correct2": {
                    "self_eval": None,
                    "self_eval2": None,
                    "correctness": None,
                    "agreement": None
                },
                "check_correct3": {
                    "self_eval2": None,
                    "sides_matching": None,
                    "correctness": None,
                    "agreement": None
                },
                "final_correctness": None
            }
        }
        count += 1
        # 1) Get model prediction
        #print('prompt:', t['prompt'])
        print('\n','='*64)
        seperator('TEST_CASE')
        print_json(t)
        r = call_model_chat_completions(
            f"{t['prompt']}",
            system="Give a short answer to each prompt, don't explain.",
            model=model,
            temperature=0.3,
            max_tokens=MAX_TOKENS
        )
        got = (r.get("text") or "").strip()
        sample["got"]=got
        tokens_used = r.get("tokens_used")
        

        got = map_tf(got, t["expected"])
        
        #If output is truncated and both evals return true, return false
        not_truncated = seperator('\nMODEL_OUTPUT', tokens_used, MAX_TOKENS)
        display(Markdown(f"\n{got}"))
        #print('raw: ', got)
        
        if not not_truncated:
            final_answers.append(False)
            sample['truncated'] = True
            print("‚ùå INCORRECT | MAX TOKENS REACHED RETURNING FALSE")
            continue
        
        match_check = basic_match_check(t, got)
        match_check = bool(match_check)
        sample["history"]["check_correct1"]["match_check"] = match_check
        
        # 2) LLM-as-a-judge: strict True/False
        is_correct = self_evaluate(
            question=t["prompt"],
            prediction=got,
            expected_answer=t["expected"],
            model=judge_model,
        )
        is_correct = bool(is_correct)
        
        sample["history"]["check_correct1"]["self_eval"] = is_correct
        
        seperator('\nMODEL OUTPUT --> FIRST EVAL')
        print('match check:', match_check)
        print('self_eval:', is_correct)
        correctness, agreement = check_correct(match_check, is_correct)
        sample["history"]["check_correct1"]['correctness'] = correctness
        sample["history"]["check_correct1"]['agreement'] = agreement
        
        if not agreement:
            #starting and ending matches
            #CAN BE USED TO VALIDATE SECOND MODEL, OR AS LAST RESORT
            start_matches, end_matches = super_match(t, got)
            sides_matching = start_matches or end_matches
            
            #second model eval
            seperator('\nDISAGREEMENT --> SECOND EVAL')
            is_correct2 = self_evaluate2(
                question=t["prompt"],
                model_output=got,
                expected_answer=t["expected"],
                prediction=is_correct,
                model=judge_model
            )
            is_correct2 = bool(is_correct2)
            
            sample["history"]["check_correct2"]["self_eval"] = is_correct
            sample["history"]["check_correct2"]["self_eval2"] = is_correct2
            
            print('self_eval2:', is_correct2)
            correctness, agreement = check_correct(is_correct, is_correct2)
            sample["history"]["check_correct2"]["correctness"] = correctness
            sample["history"]["check_correct2"]["agreement"] = agreement
            
            
            if not agreement:
                #second model eval
                seperator('\nDISAGREEMENT --> THIRD EVAL')
                print('\nside matching:', sides_matching)
                
                sample["history"]["check_correct3"]["self_eval2"] = is_correct2
                sample["history"]["check_correct3"]["sides_matching"] = sides_matching
                correctness, agreement = check_correct(sides_matching, is_correct2)
                sample["history"]["check_correct3"]["correctness"] = correctness
                sample["history"]["check_correct3"]["agreement"] = agreement    


        sample["history"]["final_correctness"] = f"‚úÖ {correctness}" if correctness else f"‚ùå {correctness}"
        final_answers.append(sample)
        
        if sleep_sec:
            time.sleep(sleep_sec)

    test_samples["samples"] = final_answers
    return test_samples

# Example:


In [120]:
import re

In [394]:
rng = random.randint(0,20000)
#seed=11789, n=30 for diverse samples
test_prompts = get_tests(index=504)#get_tests(n=30, seed=rng) #get_test_type(["math"],end=10, upper=300) get_random_tests(n=3, upper=300)
results_llm_judge = self_evaluate_tests([test_prompts], verbose=True, model=MODEL, grader_model=MODEL)
results_llm_judge["seed"] = rng
print_json(results_llm_judge)
print("\n","="*64)
#load_save_json(path_in="test_history.json", path_out="test_history.json", data_in=results_llm_judge, clear=False)



TEST_CASE
--------------------------------
{
  "id": "future_prediction_2_5_505",
  "type": "future_prediction",
  "prompt": "You are an agent that can predict future events. The event to be predicted: \"Volta Redonda vs. Novorizontino (around 2025-08-10T07:30:00Z). \nA.  the outcome be Volta Redonda\nB.  the outcome be Novorizontino\nC.  the outcome be Tie\"\n        IMPORTANT: Your final answer MUST end with this exact format:\n        listing all plausible options you have identified, separated by commas, within the box. For example: \\boxed{A} for a single option or \\boxed{B, C, D} for multiple options.\n        Do not use any other format. Do not refuse to make a prediction. Do not say \"I cannot predict the future.\" You must make a clear prediction based on the best data currently available, using the box format specified above.",
  "expected": "['C']",
  "char_count": 710,
  "exp_word_count": 1
}

MODEL_OUTPUT (TOKENS USED: 10/400)
--------------------------------



\boxed{A, B, C}


MODEL OUTPUT --> FIRST EVAL
--------------------------------
match check: False
self_eval: True
‚ùå INCORRECT
üÜò DISAGREED

[Cosine similarity: 0.0]

DISAGREEMENT --> SECOND EVAL
--------------------------------
self_eval2: False
‚ùå INCORRECT
üÜò DISAGREED

DISAGREEMENT --> THIRD EVAL
--------------------------------

side matching: False
‚ùå INCORRECT
üÜó AGREED
{
  "count": 1,
  "seed": 7941,
  "samples": [
    {
      "test_count": 0,
      "id": "future_prediction_2_5_505",
      "input": "You are an agent that can predict future events. The event to be predicted: \"Volta Redonda vs. Novorizontino (around 2025-08-10T07:30:00Z). \nA.  the outcome be Volta Redonda\nB.  the outcome be Novorizontino\nC.  the outcome be Tie\"\n        IMPORTANT: Your final answer MUST end with this exact format:\n        listing all plausible options you have identified, separated by commas, within the box. For example: \\boxed{A} for a single option or \\boxed{B, C, D} for multiple options.\n    