In [15]:
import ast
import json
import re
import uuid
from pathlib import Path
from typing import List

import pandas as pd

In [18]:
# Configuration/home/larcanio/AIMO3_v2/data/datasets/splits/bucked_2_3_splitt/test_difficulty_level.jsonl
OUTPUT_BASE_DIR = Path('/home/larcanio/AIMO3_v2/data/datasets/splits/')
RUN_DIR = OUTPUT_BASE_DIR / 'bucked_2_3_splitt/'
DATASET_NAME = 'Numina1.5'
LICENSE = 'Apache 2.0'
INPUT_FILENAME = 'test_difficulty_level.jsonl'
OUTPUT_FILENAME = 'train_dataset_normalized.jsonl'

In [19]:
# Find the JSONL file
if RUN_DIR is None:
    # Find the most recent run directory
    run_dirs = sorted([d for d in OUTPUT_BASE_DIR.iterdir() if d.is_dir()], reverse=True)
    if not run_dirs:
        raise FileNotFoundError(f"No run directories found in {OUTPUT_BASE_DIR}")
    RUN_DIR = run_dirs[0]
    print(f"Using most recent run directory: {RUN_DIR.name}")
else:
    print(f"Using specified run directory: {RUN_DIR.name}")

jsonl_file = RUN_DIR / INPUT_FILENAME

if not jsonl_file.exists():
    raise FileNotFoundError(f"JSONL file not found: {jsonl_file}")

print(f"Reading from: {jsonl_file}")

Using specified run directory: bucked_2_3_splitt
Reading from: /home/larcanio/AIMO3_v2/data/datasets/splits/bucked_2_3_splitt/test_difficulty_level.jsonl


In [20]:
# Read and parse JSONL file
datapoints = []
with open(jsonl_file, 'r', encoding='utf-8') as f:
    for line_num, line in enumerate(f, 1):
        if line.strip():  # Skip empty lines
            try:
                datapoint = json.loads(line)
                datapoints.append(datapoint)
            except json.JSONDecodeError as e:
                print(f"Error parsing line {line_num}: {e}")
                print(f"Line content: {line[:200]}...")

NUM_ITEMS_TO_DISPLAY = 10
print(f"\nTotal datapoints loaded: {len(datapoints)}")
print(f"Displaying first {min(NUM_ITEMS_TO_DISPLAY, len(datapoints))} items:\n")


Total datapoints loaded: 1271
Displaying first 10 items:



In [21]:
# Identity & provenance
# id -> create uuid for this dataset
# source_dataset -> dataset (one of [Numina1.5, OpenMath, GSM8k])
# source_problem_id -> problem_id
# source_license -> [Numina1.5: Apache 2.0, OpenMath: CC-BY-SA 4.0, GSM8k: MIT]

# Problem & answers
# text -> problem.text
# answer_expected -> problem.expected_answer
# answer_predicted -> outcome.answer
# is_correct -> attempts[where it succeeds].result.correct

# Program (the actual training payload)
# code -> attempts[where it succeeds].code
# code_runtime_ms -> attempts[where it succeeds].exec.duration  (convert from seconds to ms)
# code_generated_tokens -> (compute using tokenizer)

# Curriculum labels
# domain -> audit.2nd_stage_domain, fallback to audit.domain
# reasoning_tier -> derived from difficulty_buckets (bucket_1=1 .. bucket_5=5)

# Quality + filtering knobs
# tier -> audit.tier (rename: keep to core, keep_with_flags to extended, else drop)
# code_score -> audit.code_score (fallback: audit.scores.overall)
# risk_flags -> audit.risk_flags or audit.flags; brute_force -> enumerative_strategy

# Model provenance
# generation_model -> models.generating
# audit_model -> record.audit_model or audit.audit_model

In [22]:
# Tier mapping: old names (keep/keep_with_flags) and already-normalized (core/extended) -> output tier; else -> drop
TIER_MAP = {"keep": "core", "keep_with_flags": "extended", "core": "core", "extended": "extended"}

def _successful_attempt(record):
    """First attempt with exec.status == 'success'."""
    for a in record.get("attempts", []):
        if (a.get("exec") or {}).get("status") == "success":
            return a
    return None

def _problem_id_from_record(record):
    """Source problem id: record['problem_id'] or parse from record['id']."""
    if record.get("problem_id") is not None:
        return str(record["problem_id"])
    raw_id = record.get("id") or ""
    for part in raw_id.split("_"):
        if part.isdigit() and len(part) < 10:
            return part
    return raw_id or None

def _risk_flags(audit):
    """audit.risk_flags or audit.flags; 'brute_force' renamed to 'enumerative_strategy'."""
    a = audit or {}
    flags = list(a.get("risk_flags") or a.get("flags") or [])
    return ["enumerative_strategy" if f == "brute_force" else f for f in flags]

def _normalize_code_comment_prefixes(code):
    """Replace # Goal: -> # Objective: and # Plan: -> # Approach: in code (for stored code property)."""
    if not code or not isinstance(code, str):
        return code or ""
    return code.replace("# Goal:", "# Objective:").replace("# Plan:", "# Approach:")

def _strip_comment_value(text):
    """Remove leading ': ' or ':' from extracted objective/approach (e.g. '# Approach: : Enumerate...')."""
    if not text:
        return text
    t = text.strip()
    if t.startswith(": "):
        return t[2:].strip()
    if t.startswith(":"):
        return t[1:].strip()
    return t

def _extract_objective_and_approach(code):
    """
    Extract # Objective: and # Approach: comment lines from code (first occurrence of each).
    Strips any leading ': ' from the value. Returns (objective, approach); missing values are "".
    """
    if not code or not isinstance(code, str):
        return "", ""
    objective, approach = "", ""
    for line in code.splitlines():
        s = line.strip()
        if s.startswith("# Objective:"):
            if objective == "":
                objective = _strip_comment_value(s[12:])  # len("# Objective:") == 12
        elif s.startswith("# Approach:"):
            if approach == "":
                approach = _strip_comment_value(s[11:])  # len("# Approach:") == 11
    return objective, approach


def _reasoning_tier(record):
    """
    Derive reasoning_tier (1-5) from difficulty_buckets.
    Returns the bucket number for the first True bucket, or None if unclassified.
    """
    buckets = record.get("difficulty_buckets") or {}
    for tier in range(1, 6):
        if buckets.get(f"bucket_{tier}") is True:
            return tier
    return None


FLOAT_REGEX = re.compile(
    r"""
    (?<!\w)          # not part of identifier
    \d+\.\d+         # decimal literal
    |                # or
    float\s*\(       # float(...)
    """,
    re.VERBOSE,
)


class CodeFeatureExtractor(ast.NodeVisitor):
    def __init__(self):
        self.features = set()
        self.loop_depth = 0
        self.max_loop_depth = 0

    def visit_Import(self, node):
        for alias in node.names:
            name = alias.name.split(".")[0]
            if name == "sympy":
                self.features.add("uses_sympy")
            elif name == "itertools":
                self.features.add("uses_itertools")
            elif name == "fractions":
                self.features.add("uses_fractions")
            elif name == "numpy":
                self.features.add("uses_numpy")
        self.generic_visit(node)

    def visit_ImportFrom(self, node):
        if node.module:
            root = node.module.split(".")[0]
            if root == "sympy":
                self.features.add("uses_sympy")
            elif root == "itertools":
                self.features.add("uses_itertools")
            elif root == "fractions":
                self.features.add("uses_fractions")
            elif root == "numpy":
                self.features.add("uses_numpy")
        self.generic_visit(node)

    def visit_FunctionDef(self, node):
        self.features.add("uses_functions")
        self.generic_visit(node)

    def visit_For(self, node):
        self._enter_loop()
        self.generic_visit(node)
        self._exit_loop()

    def visit_While(self, node):
        self._enter_loop()
        self.generic_visit(node)
        self._exit_loop()

    def _enter_loop(self):
        self.features.add("has_loops")
        self.loop_depth += 1
        self.max_loop_depth = max(self.max_loop_depth, self.loop_depth)

    def _exit_loop(self):
        self.loop_depth -= 1

    def visit_Assert(self, node):
        self.features.add("has_asserts")
        self.generic_visit(node)


def extract_code_features(code: str) -> List[str]:
    """
    Extract deterministic code feature tags from Python code.
    Returns a sorted list of feature strings.
    """
    extractor = CodeFeatureExtractor()
    try:
        tree = ast.parse(code)
        extractor.visit(tree)
    except SyntaxError:
        return []
    if FLOAT_REGEX.search(code):
        extractor.features.add("uses_floats")
    if extractor.max_loop_depth >= 2:
        extractor.features.add("uses_deep_loops")
    return sorted(extractor.features)


def record_to_flat(record, dataset_name, license_, tokenizer_fn=None):
    """
    Convert one source record to the normalized flat format.
    Returns None only if there is no successful attempt.
    domain: audit.2nd_stage_domain with fallback to audit.domain. tokenizer_fn(code: str) -> int | None (optional).
    """
    attempt = _successful_attempt(record)
    if attempt is None:
        return None

    problem = record.get("problem") or {}
    audit = record.get("audit") or {}
    decision = (audit.get("tier") or "").strip().lower()
    classification = audit.get("classification") or {}

    # domain: audit.2nd_stage_domain with fallback to audit.domain
    domain = audit.get("2nd_stage_domain") or audit.get("domain")
    domain = domain.strip() if isinstance(domain, str) else domain
    models = record.get("models") or {}
    exec_ = attempt.get("exec") or {}
    result = attempt.get("result") or {}

    # answer_predicted: from successful attempt's result.predicted (outcome.answer not in source)
    answer_predicted = result.get("predicted")

    # code_runtime_ms: exec.duration is in seconds
    duration_sec = exec_.get("duration")
    code_runtime_ms = int(duration_sec * 1000) if isinstance(duration_sec, (int, float)) else None

    # code: normalize comment prefixes (# Goal: -> # Objective:, # Plan: -> # Approach:) then use for storage and extraction
    code = attempt.get("code") or ""
    code = _normalize_code_comment_prefixes(code)
    code_generated_tokens = tokenizer_fn(code) if tokenizer_fn else None

    # objective and approach from # Objective: / # Approach: comments in code
    objective, approach = _extract_objective_and_approach(code)

    # code_features: deterministic list of features present in the code
    code_features = extract_code_features(code)

    # tier: keep/core, keep_with_flags/extended, or drop (all tiers kept in output)
    # Default to "drop" for empty/missing tier or any unmapped value
    tier = TIER_MAP.get(decision, "drop")

    # Preserve original dataset from record when present; otherwise use dataset_name
    dataset_value = record.get("dataset") or dataset_name

    # text_tokens: count tokens in problem text
    problem_text = problem.get("text")
    text_tokens = tokenizer_fn(problem_text) if tokenizer_fn and problem_text else None

    # solution: original_solution from source record (if present)
    solution = record.get("original_solution")

    # reasoning_tier: derived from difficulty_buckets (1-5)
    reasoning_tier = _reasoning_tier(record)

    flat = {
        # ─────────────────────────────────────────────
        # 1. Identity & provenance
        # ─────────────────────────────────────────────
        "id": str(uuid.uuid4()),
        "dataset": dataset_value,
        "problem_id": _problem_id_from_record(record),
        "license": license_,

        # ─────────────────────────────────────────────
        # 2. Problem definition
        # ─────────────────────────────────────────────
        "domain": domain,
        "reasoning_tier": reasoning_tier,
        "tier": tier,
        "text": problem_text,
        "text_tokens": text_tokens,
        "solution": solution,
        "answer_expected": problem.get("expected_answer"),

        # ─────────────────────────────────────────────
        # 3. Generated solution (semantic)
        # ─────────────────────────────────────────────
        "objective": objective,
        "approach": approach,
        "answer_predicted": answer_predicted,

        # ─────────────────────────────────────────────
        # 4. Verification & correctness
        # ─────────────────────────────────────────────
        "is_correct": result.get("correct"),

        # ─────────────────────────────────────────────
        # 5. Code artifact
        # ─────────────────────────────────────────────
        "code": code or None,
        "code_features": code_features,

        # ─────────────────────────────────────────────
        # 6. Code execution telemetry
        # ─────────────────────────────────────────────
        "code_runtime_ms": code_runtime_ms,
        "code_generated_tokens": code_generated_tokens,

        # ─────────────────────────────────────────────
        # 7. Quality, audit & risk
        # ─────────────────────────────────────────────
        "code_score": audit.get("code_score") if "code_score" in (audit or {}) else audit.get("scores", {}).get("overall"),
        "risk_flags": _risk_flags(audit),

        # ─────────────────────────────────────────────
        # 8. Model lineage
        # ─────────────────────────────────────────────
        "generation_model": models.get("generating"),
        "audit_model": record.get("audit_model") or audit.get("audit_model"),
    }
    return flat

In [23]:
import tiktoken
_tokenizer_fn = None
enc = tiktoken.get_encoding("cl100k_base")
_tokenizer_fn = lambda code: len(enc.encode(code, disallowed_special=())) if code else 0

In [24]:
# Convert and write normalized output (read-only: original dataset is not modified)
out_path = RUN_DIR / OUTPUT_FILENAME
normalized = []
for rec in datapoints:
    flat = record_to_flat(rec, DATASET_NAME, LICENSE, tokenizer_fn=_tokenizer_fn)
    if flat is not None:
        normalized.append(flat)

with open(out_path, "w", encoding="utf-8") as f:
    for rec in normalized:
        f.write(json.dumps(rec, ensure_ascii=False) + "\n")

print(f"Read: {len(datapoints)} records from {jsonl_file}")
print(f"Written: {len(normalized)} records to {out_path}")
print(f"Skipped (no successful attempt): {len(datapoints) - len(normalized)}")

Read: 1271 records from /home/larcanio/AIMO3_v2/data/datasets/splits/bucked_2_3_splitt/test_difficulty_level.jsonl
Written: 1271 records to /home/larcanio/AIMO3_v2/data/datasets/splits/bucked_2_3_splitt/train_dataset_normalized.jsonl
Skipped (no successful attempt): 0


In [25]:
# Preview first normalized record
if normalized:
    print(json.dumps(normalized[1], indent=2, ensure_ascii=False))

{
  "id": "94ab8f03-a8e3-482e-a24c-3a7cea9a93ec",
  "dataset": "numina1.5",
  "problem_id": "1855",
  "license": "Apache 2.0",
  "domain": "algebra",
  "reasoning_tier": null,
  "tier": "core",
  "text": "I am thinking of a three-digit natural number less than 200. If I round its triple to the hundreds, it increases by 36. Which number am I thinking of?\n\n(M. Dillingerová)",
  "text_tokens": 44,
  "solution": null,
  "answer_expected": "188",
  "objective": "Find the unique three‑digit natural number x (<200) such that its triple 3x, rounded to the nearest hundred, equals 3x + 36.",
  "approach": "Translate the rounding condition into a congruence modulo 100 and enumerate the possible x in the given range. Verify the result with an assertion that the rounding rule holds.",
  "answer_predicted": "188",
  "is_correct": true,
  "code": "# Objective: Find the unique three‑digit natural number x (<200) such that its triple 3x, rounded to the nearest hundred, equals 3x + 36.\n# Approach: Tr