In [2]:
import os
import json
import re
import glob
from inspect_ai import Task, eval as inspect_eval, score
from inspect_ai.dataset import Dataset, Sample
from inspect_ai.model import get_model
from inspect_ai.solver import generate
from inspect_ai.scorer import scorer, Score, accuracy, stderr

In [5]:
def parse_bool(val):
    if isinstance(val, bool):
        return val
    if isinstance(val, str):
        return val.strip().lower() in ["true", "yes", "1"]
    return False

@scorer(metrics=[accuracy(), stderr()])
def spot_custom_scorer():
    async def score(state, target):
        output = state.output.completion
        try:
            target_dict = json.loads(target.text) if hasattr(target, 'text') else dict(target)
        except Exception:
            target_dict = {}
        gt_errors_detected = parse_bool(target_dict.get("errors_detected", None))
        gt_specific_error_found = parse_bool(target_dict.get("specific_error_found", None))
        gt_matches_human = parse_bool(target_dict.get("matches_human_annotation", None))

        try:
            json_match = re.search(r'\{.*\}', output, re.DOTALL)
            if json_match:
                model_out = json.loads(json_match.group())
            else:
                model_out = {}
        except Exception:
            model_out = {}
        mdl_errors_detected = parse_bool(model_out.get("errors_detected", None))
        mdl_specific_error_found = parse_bool(model_out.get("specific_error_found", None))
        mdl_matches_human = parse_bool(model_out.get("matches_human_annotation", None))
        confidence = model_out.get("confidence", None)
        error_description = model_out.get("error_description", "")

        matches = [
            gt_errors_detected == mdl_errors_detected,
            gt_specific_error_found == mdl_specific_error_found,
            gt_matches_human == mdl_matches_human
        ]
        num_matches = sum(matches)
        if num_matches == 3:
            value = 1.0
        elif num_matches == 2:
            value = 0.66
        elif num_matches == 1:
            value = 0.33
        else:
            value = 0.0

        explanation = (
            f"errors_detected: {'✅' if matches[0] else '❌'} | "
            f"specific_error_found: {'✅' if matches[1] else '❌'} | "
            f"matches_human_annotation: {'✅' if matches[2] else '❌'}\n"
            f"Model confidence: {confidence}\n"
            f"Model error_description: {error_description[:200]}{'...' if len(error_description) > 200 else ''}"
        )
        return Score(value=value, explanation=explanation)
    return score

In [None]:

os.environ['ANTHROPIC_API_KEY'] = 'YOUR-CLAUDE-API-KEY HERE'

# Load dataset
with open("spot_dataset.json") as f:
    df = json.load(f)

# Create dataset for inspect_ai
samples = []
for i, row in enumerate(df[:10]):  # Change [:2] to [:N] for more samples
    paper_content = row["paper_content"]
    if isinstance(paper_content, list):
        text_parts = []
        for chunk in paper_content:
            if isinstance(chunk, dict) and chunk.get('text'):
                text_parts.append(chunk['text'])
        paper_text = '\n\n'.join(text_parts)
    else:
        paper_text = str(paper_content)
    prompt = f"""
You are a scientific rigor auditor specializing in academic paper verification. 

PAPER CONTENT:
{paper_text[:2000]}

KNOWN ERROR INFORMATION:
- Error Category: {row['error_category']}
- Error Location: {row['error_location']}
- Human Annotation: {row['error_annotation']}

Your task is to analyze this paper and determine:
1. Whether you can detect the specific error mentioned in the human annotation
2. If you can identify any other scientific errors
3. How well your analysis matches the human annotation

Please respond in this exact JSON format:
{{
    \"errors_detected\": true/false,
    \"specific_error_found\": true/false,
    \"error_description\": \"detailed description of errors found\",
    \"matches_human_annotation\": true/false,
    \"confidence\": 0.0-1.0
}}

Respond only with the JSON object, no additional text.
"""
    # Target for scoring
    target = {
        "errors_detected": row.get("errors_detected", None),
        "specific_error_found": row.get("specific_error_found", None),
        "matches_human_annotation": row.get("matches_human_annotation", None)
    }
    target_str = json.dumps(target)
    samples.append(Sample(input=prompt, target=target_str, id=f"paper_{i}"))

class SimpleDataset(Dataset):
    def __init__(self, samples):
        self._samples = samples
    def __getitem__(self, index):
        return self._samples[index]
    def __len__(self):
        return len(self._samples)
    def filter(self, predicate, name=None):
        filtered_samples = [s for s in self._samples if predicate(s)]
        return SimpleDataset(filtered_samples)
    def shuffle(self, seed=None):
        import random
        if seed is not None:
            random.seed(seed)
        random.shuffle(self._samples)
    def shuffle_choices(self, seed=None):
        pass
    def sort(self, reverse=False, key=None):
        if key is None:
            def default_key(sample):
                return len(str(sample.input))
            key = default_key
        self._samples.sort(key=key, reverse=reverse)
    @property
    def location(self):
        return "simple_dataset"
    @property
    def name(self):
        return "Simple Dataset"
    @property
    def shuffled(self):
        return False

dataset = SimpleDataset(samples)
model = get_model("anthropic/claude-3-5-haiku-20241022")
task = Task(
    dataset=dataset,
    solver=generate(),
    name="spot_inference_task",
    version=1
)

import tempfile
log_dir = tempfile.mkdtemp(prefix="inspect_ai_logs_")
results = inspect_eval(
    tasks=task,
    model=model,
    limit=len(samples),
    display="log",
    log_dir=log_dir,
    log_format="json",
    log_level="info"
)
print(f"Log directory: {log_dir}")

Log directory: /var/folders/qy/nwfw_b_s0qq095_lq9ykgdcr0000gn/T/inspect_ai_logs_evp4iwn0


In [7]:
print(f"Number of samples created: {len(samples)}")
print("Log directory:", log_dir)
log_files = glob.glob(os.path.join(log_dir, "*.json"))
print("Log files:", log_files)
with open(log_files[0]) as f:
    data = json.load(f)
print("Samples in log:", len(data.get('samples', [])))

Number of samples created: 10
Log directory: /var/folders/qy/nwfw_b_s0qq095_lq9ykgdcr0000gn/T/inspect_ai_logs_evp4iwn0
Log files: ['/var/folders/qy/nwfw_b_s0qq095_lq9ykgdcr0000gn/T/inspect_ai_logs_evp4iwn0/2025-07-21T17-49-21-04-00_spot-inference-task_idrtshiV8tTF6dBShrEBZk.json']
Samples in log: 10


In [16]:
import glob
import os
from inspect_ai.log import read_eval_log, write_eval_log
from inspect_ai import score
from spot_custom_scorer import spot_custom_scorer  # adjust if your scorer is elsewhere

def find_latest_log():
    log_pattern = "/var/folders/qy/nwfw_b_s0qq095_lq9ykgdcr0000gn/T/inspect_ai_logs_*/"
    log_dirs = glob.glob(log_pattern)
    if not log_dirs:
        print("❌ No log directories found")
        return None
    latest_dir = max(log_dirs, key=os.path.getctime)
    # Only get raw logs (not already scored)
    json_files = [f for f in glob.glob(os.path.join(latest_dir, "*.json")) if not f.endswith("_scored.json")]
    if not json_files:
        print("❌ No raw JSON files found in log directory")
        return None
    return max(json_files, key=os.path.getctime)

log_file = find_latest_log()
if log_file is None:
    raise FileNotFoundError("No log file found.")

log = read_eval_log(log_file)
scored_log = score(log, spot_custom_scorer())
base, ext = os.path.splitext(log_file)
scored_file = base + "_scored" + ext
write_eval_log(scored_log, scored_file)
print(f"✅ Scored log written to: {scored_file}")

✅ Scored log written to: /var/folders/qy/nwfw_b_s0qq095_lq9ykgdcr0000gn/T/inspect_ai_logs_evp4iwn0/2025-07-21T17-49-21-04-00_spot-inference-task_idrtshiV8tTF6dBShrEBZk_scored.json


In [17]:
import json
with open("/var/folders/qy/nwfw_b_s0qq095_lq9ykgdcr0000gn/T/inspect_ai_logs_evp4iwn0/2025-07-21T17-49-21-04-00_spot-inference-task_idrtshiV8tTF6dBShrEBZk_scored.json") as f:
    data = json.load(f)

In [18]:
results = []
for sample in data.get('samples', []):
    sample_id = sample.get('id')
    timing = {
        'total_time': sample.get('total_time', 0),
        'working_time': sample.get('working_time', 0)
    }
    score_val = None
    explanation = None
    scores = sample.get('scores')
    if isinstance(scores, list) and len(scores) > 0:
        # Most common case: list of dicts
        score_val = scores[0].get('value')
        explanation = scores[0].get('explanation')
    elif isinstance(scores, dict) and scores:
        # Sometimes scores is a dict of metrics
        first_score = next(iter(scores.values()))
        if isinstance(first_score, dict):
            score_val = first_score.get('value')
            explanation = first_score.get('explanation')
    results.append({
        'id': sample_id,
        'timing': timing,
        'score': score_val,
        'explanation': explanation
    })

with open("spot_results.json", "w") as f:
    json.dump(results, f, indent=2)
print("✅ Results saved to spot_results.json")

✅ Results saved to spot_results.json


In [19]:
import pandas as pd

df = pd.read_json("spot_results.json")
print(df.head())
print("Mean score:", df['score'].mean())

        id                                        timing  score  \
0  paper_0    {'total_time': 3.384, 'working_time': 3.2}   0.66   
1  paper_1  {'total_time': 3.569, 'working_time': 3.404}   0.00   
2  paper_2  {'total_time': 2.411, 'working_time': 2.254}   1.00   
3  paper_3  {'total_time': 3.301, 'working_time': 3.171}   0.00   
4  paper_4  {'total_time': 3.492, 'working_time': 3.382}   0.66   

                                         explanation  
0  errors_detected: ❌ | specific_error_found: ✅ |...  
1  errors_detected: ❌ | specific_error_found: ❌ |...  
2  errors_detected: ✅ | specific_error_found: ✅ |...  
3  errors_detected: ❌ | specific_error_found: ❌ |...  
4  errors_detected: ❌ | specific_error_found: ✅ |...  
Mean score: 0.664
