This notebook evaluates our backtranslation method:
1. LLM extraction of TikZ → IR 
2. Rule-based evaluation checks on IR

In [1]:
%load_ext autoreload
%autoreload 2

import sys, pathlib, os

notebook_dir = pathlib.Path.cwd()
target = (notebook_dir / "..").resolve()
os.chdir(target)

import pandas as pd
import json
import asyncio
import time
from datetime import datetime
import aiofiles
from pathlib import Path
from tqdm.asyncio import tqdm
import numpy as np

from backtranslation import tikz_to_ir
from evaluator import Evaluator
from utils.ir_schema import TikzIR

# Get model from environment
MODEL = os.environ.get("OPENAI_MODEL")
print(f"Using model: {MODEL}")

  5. For rectangle_primitives array, set "is_right_angle_symbol": true when the rectangle is drawn as a right-angle marker (e.g., tiny square sharing corners with two incident edges or comments mentioning a right angle). Otherwise set it to false. If a \draw explicitly passes the 'right angle symbol' option, do not add it as a rectangle_primitive.


Using model: gpt-4.1-mini


In [2]:
RESULTS_DIR = Path("results/backtranslation")
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

CACHE_DIR = RESULTS_DIR / "cache"
CACHE_DIR.mkdir(exist_ok=True)

# Rate limiting
MAX_CONCURRENT = 6  # Adjust based on OpenAI rate limits
REQUEST_DELAY = 0.075  # delay between requests in seconds

TEST_SET_PATH = "data/geometric_shapes_test_set.csv"

print(f"Results will be saved to: {RESULTS_DIR}")
print(f"Cache directory: {CACHE_DIR}")

Results will be saved to: results/backtranslation
Cache directory: results/backtranslation/cache


In [3]:
print("Loading test dataset...")
df = pd.read_csv(TEST_SET_PATH)
print(f"Loaded {len(df)} examples")
print(f"Columns: {list(df.columns)}")
print(f"Categories: {df['main_category'].value_counts()}")
df.head(2)

Loading test dataset...
Loaded 398 examples
Columns: ['prompt', 'tikz', 'image', 'main_category', 'subcategory', 'diagram_id', 'assignment_type', 'assigned_to', 'image_png_path']
Categories: main_category
2d shapes    208
3d shapes    190
Name: count, dtype: int64


Unnamed: 0,prompt,tikz,image,main_category,subcategory,diagram_id,assignment_type,assigned_to,image_png_path
0,triangle with side length 8 horizontal at bott...,\documentclass{IM}\n\usepackage{tikz}\n\begin{...,https://2xavun1dsa0sayar.public.blob.vercel-st...,2d shapes,triangle,1,individual,Shubhra,data/judge_pngs/diagram_1.png
1,Two triangles showing scaled copy relationship...,\documentclass{IM}\n\usepackage{tikz}\n\begin{...,https://2xavun1dsa0sayar.public.blob.vercel-st...,2d shapes,triangle,2,individual,Rebecca,data/judge_pngs/diagram_2.png


In [4]:
evaluator = Evaluator()
print("Evaluator initialized")
print(f"Global checks: {[fn.__name__ for fn in evaluator.global_evals]}")

Evaluator initialized
Global checks: ['angle_labels_matches_arcs', 'diagram_fully_in_canvas', 'labels_associated_with_elements', 'diagram_elements_dont_problematically_overlap', 'diagram_elements_are_readable_size', 'shape_outlines_are_closed', 'core_mathematical_properties_of_shapes_correct', 'labeled_lengths_areas_match_proportions']


#### Experiment functions

In [5]:
from typing import Optional
import warnings

async def get_cached_result(diagram_id: int, model: str = None) -> Optional[dict]:
    """Check if result is already cached"""
    if model is None:
        model = MODEL  # Use the global MODEL variable
    cache_key = f"diagram_{diagram_id}_{model}"
    cache_file = CACHE_DIR / f"{cache_key}.json"
    if cache_file.exists():
        async with aiofiles.open(cache_file, 'r') as f:
            content = await f.read()
        record = json.loads(content)
        apply_cost_estimate(record, model)
        return record
    return None

async def save_result(result: dict):
    """Save result to cache immediately"""
    model = result.get('model', MODEL)  # Get model from result or use global
    cache_key = f"diagram_{result['diagram_id']}_{model}"
    cache_file = CACHE_DIR / f"{cache_key}.json"
    apply_cost_estimate(result, model)
    async with aiofiles.open(cache_file, 'w') as f:
        await f.write(json.dumps(result, indent=2))


MODEL_COSTS = {
    "gpt-5":        (1.25, 10.00),
    "gpt-5-mini":   (0.25, 2.00),
    "gpt-5-nano":   (0.05, 0.40),
    "gpt-4.1":      (2.00, 8.00),
    "gpt-4.1-mini": (0.40, 1.60),
    "gpt-4.1-nano": (0.10, 0.40),
    "gpt-4o":       (5.00, 15.00),
    "gpt-4o-mini":  (0.15, 0.60),
}

def estimate_cost(prompt_tokens: int, completion_tokens: int, model: str) -> float:
    """Estimate API cost using per-1M-token pricing"""
    if model not in MODEL_COSTS:
        warnings.warn(
            f"Unknown model '{model}' in MODEL_COSTS; assuming zero token cost.",
            RuntimeWarning,
        )
        input_cost, output_cost = (0.0, 0.0)
    else:
        input_cost, output_cost = MODEL_COSTS[model]
    # Normalize per 1M tokens
    input_cost /= 1_000_000
    output_cost /= 1_000_000
    return (prompt_tokens * input_cost) + (completion_tokens * output_cost)

def apply_cost_estimate(cache_entry: dict, model_override: Optional[str] = None) -> float:
    """Ensure cached entries store a normalized cost estimate."""
    if not cache_entry:
        return 0.0
    ir_block = cache_entry.setdefault("ir", {})
    model_name = model_override or cache_entry.get("model", MODEL)
    prompt_tokens = ir_block.get("prompt_tokens", 0) or 0
    completion_tokens = ir_block.get("completion_tokens", 0) or 0
    cost = estimate_cost(prompt_tokens, completion_tokens, model_name)
    ir_block["estimated_cost"] = cost
    cache_entry["extraction_cost"] = cost
    return cost

if 'all_results' in globals():
    for entry in all_results:
        apply_cost_estimate(entry, entry.get("model"))



In [6]:
async def process_single_example(row: pd.Series, semaphore: asyncio.Semaphore) -> dict:
    """Process a single TikZ example with rate limiting"""
    cached = await get_cached_result(row['diagram_id'], MODEL)
    if cached:
        return cached
    
    async with semaphore:
        result = {
            "prompt": row.get('prompt', ''),
            "tikz_code": row['tikz'],
            "diagram_id": row['diagram_id'],
            "model": MODEL,
            "main_category": row['main_category'],
            "subcategory": row['subcategory'],
            "timestamp": datetime.now().isoformat(),
            "ir": {},
            "evaluation_results": {},
            "evaluation_time_ms": 0,
            "overall_score": 0.0,
            "overall_pass": False
        }
        
        token_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
        raw_ir = None
        tikz_time = 0.0
        
        try:
            tikz_start = time.time()
            raw_ir, token_usage = await tikz_to_ir(row['tikz'])
            tikz_time = (time.time() - tikz_start) * 1000

            prompt_tokens = token_usage.get('prompt_tokens', 0)
            completion_tokens = token_usage.get('completion_tokens', 0)
            estimated_cost = estimate_cost(prompt_tokens, completion_tokens, MODEL)
            result["extraction_cost"] = estimated_cost
            
            result["ir"] = {
                "success": True,
                "time_ms": tikz_time,
                "estimated_cost": estimated_cost,
                "prompt_tokens": prompt_tokens,
                "completion_tokens": completion_tokens,
                "error": None,
                "validation_error": None,
                "generated_ir": raw_ir
            }

            # Validate IR before evaluation, but continue even if strict validation fails
            ir_for_evaluation = raw_ir
            try:
                ir_for_evaluation = TikzIR.model_validate(raw_ir)
            except Exception as e:
                validation_error = f"IR validation failed: {str(e)}"
                result["ir"]["success"] = False
                result["ir"]["error"] = validation_error
                result["ir"]["validation_error"] = validation_error

            eval_start = time.time()
            eval_results = evaluator.evaluate(ir_for_evaluation)
            eval_time = (time.time() - eval_start) * 1000
            
            for check in eval_results['global']:
                result["evaluation_results"][check['check']] = {
                    "passed": check['passed'],
                    "message": check['message']
                }
            
            result["evaluation_time_ms"] = eval_time
            result["overall_score"] = eval_results['score']
            result["overall_pass"] = eval_results['overall_pass']
            
        except Exception as e:
            fallback_cost = estimate_cost(
                token_usage.get('prompt_tokens', 0),
                token_usage.get('completion_tokens', 0),
                MODEL,
            ) if token_usage else 0
            result["extraction_cost"] = fallback_cost
            result["ir"] = {
                "success": False,
                "time_ms": tikz_time,
                "estimated_cost": fallback_cost,
                "prompt_tokens": token_usage.get('prompt_tokens', 0),
                "completion_tokens": token_usage.get('completion_tokens', 0),
                "error": str(e),
                "validation_error": str(e),
                "generated_ir": raw_ir
            }
            result["evaluation_results"] = {}
            result["evaluation_time_ms"] = 0
            result["overall_score"] = 0.0
            result["overall_pass"] = False
        
        await save_result(result)
        await asyncio.sleep(REQUEST_DELAY)
        
        return result

In [7]:
async def run_evaluation_experiment(df: pd.DataFrame, max_examples: int = None):
    """Run the full evaluation experiment with parallel processing"""
    
    if max_examples:
        df = df.head(max_examples)
        print(f"Running experiment on first {max_examples} examples")
    
    print(f"Processing {len(df)} examples with max {MAX_CONCURRENT} concurrent requests")
    
    # Create semaphore for rate limiting
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    
    # Create tasks for all examples
    tasks = [
        process_single_example(row, semaphore) 
        for _, row in df.iterrows()
    ]
    
    # Run with progress bar
    results = await tqdm.gather(*tasks, desc="Processing examples")
    
    return results

In [8]:
# Run experiment on a small subset first to test
print("Running test with 10 examples...")
test_results = await run_evaluation_experiment(df, max_examples=10)
print(f"\nCompleted {len(test_results)} examples")
print(f"Success rate: {sum(1 for r in test_results if r['ir']['success']) / len(test_results):.2%}")

Running test with 10 examples...
Running experiment on first 10 examples
Processing 10 examples with max 6 concurrent requests


  return lib.buffer(
Processing examples: 100%|██████████| 10/10 [00:13<00:00,  1.38s/it]


Completed 10 examples
Success rate: 90.00%





In [None]:
# Run on full dataset 
print("Running full experiment...")
all_results = await run_evaluation_experiment(df, max_examples=None)  # Remove limit for full run

print(f"\nExperiment complete! Processed {len(all_results)} examples")

#### Save to CSV

In [None]:
# Export detailed results to CSV
def export_results_to_csv(results: list[dict], filepath: str):
    """Export results to CSV for further analysis"""
    
    rows = []
    for result in results:
        row = {
            'diagram_id': result['diagram_id'],
            'main_category': result['main_category'],
            'subcategory': result['subcategory'],
            'assignment_type': result.get('assignment_type', 'unknown'),
            'extraction_success': result['ir']['success'],
            'extraction_time_ms': result['ir']['time_ms'],
            'extraction_cost': result.get('extraction_cost', result['ir'].get('estimated_cost', 0)),
            'evaluation_time_ms': result['evaluation_time_ms'],
            'overall_score': result['overall_score'],
            'overall_pass': result['overall_pass']
        }
        
        # Add individual check results
        for check_name, check_result in result['evaluation_results'].items():
            row[f'{check_name}_passed'] = check_result['passed']
            row[f'{check_name}_message'] = check_result['message']
        
        rows.append(row)
    
    results_df = pd.DataFrame(rows)
    results_df.to_csv(filepath, index=False)
    print(f"Results exported to {filepath}")
    return results_df


# --- Updated naming scheme ---
date_str = datetime.now().strftime("%Y%m%d")
model_str = MODEL.replace(":", "_").replace("/", "_")  # clean up for filenames

results_csv_path = RESULTS_DIR / f"evaluation_results_{model_str}_{date_str}.csv"
results_df = export_results_to_csv(all_results, results_csv_path)

print(f"\nAnalysis summary saved to {analysis_path}")
print(f"Results CSV shape: {results_df.shape}")
results_df.head()

