In [None]:
!unzip proofs.zip

In [None]:
!pip install bitarray

In [None]:
# Step 1: Install elan (Lean toolchain manager)
!curl https://raw.githubusercontent.com/leanprover/elan/master/elan-init.sh -sSf | sh -s -- -y

# Step 2: Update Python process PATH so subprocess.run() can find `lean`
import os
elan_bin_path = os.path.expanduser("~/.elan/bin")
os.environ["PATH"] = elan_bin_path + ":" + os.environ["PATH"]

# Verify the installation by checking the version
!lean --version

import os
import subprocess

def setup_lean_project(project_dir="/tmp/lean_project"):
    """
    Creates a Lean project, configures it to use Mathlib,
    and downloads pre-compiled library files.
    """
    print(f"--- Setting up Lean project in: {project_dir} ---")
    os.makedirs(project_dir, exist_ok=True)

    # Content for the lakefile.lean
    lakefile_content = """
    import Lake
    open Lake DSL

    package «lean_project»

    require mathlib from git
      "https://github.com/leanprover-community/mathlib4.git"

    @[default_target]
    lean_lib «lean_project»
    """
    # Write the lakefile
    with open(os.path.join(project_dir, "lakefile.lean"), "w") as f:
        f.write(lakefile_content)

    # Run `lake exe cache get` to download Mathlib's pre-compiled files
    # This is much faster than building from source.
    print("--- Downloading Mathlib cache (this may take a few minutes)... ---")
    try:
        subprocess.run(
            ["lake", "exe", "cache", "get"],
            cwd=project_dir,
            check=True,
            capture_output=True,
            text=True
        )
        print("--- Mathlib cache downloaded successfully. ---")
    except subprocess.CalledProcessError as e:
        print("❌ Error setting up Mathlib cache.")
        print(f"--- STDOUT ---\n{e.stdout}")
        print(f"--- STDERR ---\n{e.stderr}")
        raise  # Stop execution if setup fails

    return project_dir

# --- Call this function once at the start of your script ---
lean_project_path = setup_lean_project()
lean_project_path

In [None]:
!mkdir corrected_proofs

In [None]:
import subprocess
import os
import re
from concurrent.futures import ProcessPoolExecutor, as_completed
from utils import get_proof_variants
from typing import Dict

def check_lean_proof(proof_and_context: Dict, name: str = "temp_proof") -> bool:
    """
    Checks a Lean‑4 proof string inside the given project using `lake`.
    If any variant succeeds, the *first* successful proof is saved to:
        corrected_proofs/<problem_id>/<proof_solver>/<attempt_id>.txt
    Returns True if a proof was saved, otherwise False.
    """
    # Verify the top‑level keys that must be present
    assert "proof" in proof_and_context, \
        "Missing 'proof' key – you need a proof string to test."
    assert "formal_statement" in proof_and_context, \
        "Missing 'formal_statement' key – you have to give the theorem statement."
    assert "project_dir" in proof_and_context, \
        "Missing 'project_dir' key – cannot locate the Lean project."
    assert "metadata" in proof_and_context, \
        "Missing 'metadata' key – you’ll need context such as attempt_id."

    # Verify the required nested keys inside metadata
    assert "attempt_id" in proof_and_context["metadata"], \
        "Metadata lacks 'attempt_id' – needed to name the output file."
    assert "problem_id" in proof_and_context["metadata"], \
        "Metadata lacks 'problem_id' – needed for the directory structure."
    assert "proof_solver" in proof_and_context["metadata"], \
        "Metadata lacks 'proof_solver' – you need to know which solver produced this."

    # Unpack everything we need
    proof_string   = proof_and_context["proof"]
    statement      = proof_and_context["formal_statement"]
    project_dir    = proof_and_context["project_dir"]

    metadata       = proof_and_context["metadata"]
    attempt_id     = metadata["attempt_id"]
    problem_id     = metadata["problem_id"]
    solver_name    = metadata["proof_solver"]


    # Where the successful proof will be written.
    save_dir = os.path.join(
        "corrected_proofs", problem_id, solver_name
    )
    os.makedirs(save_dir, exist_ok=True)          # make sure it exists

    # Build every candidate proof.
    proof_variants = get_proof_variants(proof_string)

    # Each variant becomes a tiny Lean file: statement + proof.
    candidates = [
        f"{statement}\n{variant}" for variant in proof_variants
    ]

    # Try them one by one.
    for idx, code in enumerate(candidates):
        temp_filename = f"{name}_{idx}.lean"
        temp_path = os.path.join(project_dir, temp_filename)

        try:
            # Write the candidate to a temporary file inside the project.
            with open(temp_path, "w", encoding="utf-8") as f:
                f.write(code)

            # Run Lean via lake.
            command = ["lake", "env", "lean", temp_filename]
            result = subprocess.run(
                command,
                cwd=project_dir,
                capture_output=True,
                text=True,
                timeout=120,               # 2 minutes, just in case
            )

            # Success = returncode 0 and no “error:” in stdout.
            if result.returncode == 0 and "error:" not in result.stdout:
                # Save the *first* working proof.
                out_path = os.path.join(save_dir, f"{attempt_id}.txt")
                with open(out_path, "w", encoding="utf-8") as out_f:
                    out_f.write(code)

                # Clean up the temp file.
                os.remove(temp_path)

                return True   # yay, we found a good one

            # If it failed, just treat this variant as “false” and move on.
        except Exception as e:   # any crash = false for this variant
            pass
        finally:
            # Make sure we don’t leave stray temp files lying around.
            if os.path.exists(temp_path):
                try:
                    os.remove(temp_path)
                except Exception:
                    pass

    # No variant succeeded.
    return False



def check_proofs_in_parallel(proof_contexts: list[dict], parallel_workers: int = None) -> bool:
    """
    Runs multiple proof checks in parallel, numbered 1..len(proof_contexts).
    Returns True on first success.
    """
    if not proof_contexts:
        return False

    with ProcessPoolExecutor(max_workers=parallel_workers) as executor:
        futures = {
            executor.submit(check_lean_proof, ctx, idx): idx
            for idx, ctx in enumerate(proof_contexts, start=1)
        }
        for future in as_completed(futures):
            try:
                if future.result():
                    return True  # early exit on first successful attempt
            except Exception:
                pass  # optionally log
    return False


In [None]:
# 1. Define the proof and context with the Mathlib header
correct_proof_dict = {
    'formal_statement': 'import Mathlib.Tactic\ntheorem two_plus_two_is_four : 2 + 2 = 4',
    'proof': ':= by rfl',
    'project_dir': lean_project_path,
    'metadata': {'proof_solver': 'example_solver', 'problem_id': 'example_id', 'attempt_id': '1'}
}


check_lean_proof(correct_proof_dict)


True

# Assumptions

In [6]:
# assert that the folder structure is as expected
import os
from typing import Tuple

def check_structure(path: str):
    def check_numeric_children_consecutive(path: str, require_start_zero: bool = True) -> Tuple[int, int]:
        """
        Validate that:
        * `path` exists and is a directory with at least one child.
        * Immediate children are directories named in canonical natural-number form:
            "0", "1", "2", ... (no leading zeros except "0").
        * Their integer values form a consecutive range from min to max.
        * If require_start_zero is True, the range must start at 0.

        Returns:
            (min_value, max_value)

        Raises:
            AssertionError on any violation.
        """
        if not os.path.isdir(path):
            raise AssertionError(f"{path!r} is not a directory")
        entries = os.listdir(path)
        assert entries, f"{path!r} is empty"

        nat_canonical = re.compile(r"0|[1-9][0-9]*\Z")
        nums = []
        for name in entries:
            full = os.path.join(path, name)
            assert os.path.isdir(full), f"{full!r} is not a directory"
            assert nat_canonical.fullmatch(name), (
                f"{name!r} is not a canonical natural number ('0', '1', '2', ... without leading zeros)"
            )
            nums.append(int(name))

        min_n, max_n = min(nums), max(nums)
        if require_start_zero:
            assert min_n == 0, f"Sequence must start at 0 but starts at {min_n}"
        expected = set(range(min_n, max_n + 1))
        actual = set(nums)
        if actual != expected:
            missing = sorted(expected - actual)
            extra = sorted(actual - expected)
            msg = f"Immediate numeric directory names {sorted(entries)} do not form a consecutive range {min_n}..{max_n}"
            if missing:
                msg += f"; missing {missing}"
            if extra:
                msg += f"; unexpected {extra}"
            raise AssertionError(msg)
        return min_n, max_n
    # assert that it's a directory
    assert os.path.isdir(path), f"{path} is not a directory"

    # assert directory is not empty
    assert any(os.listdir(path)), f"{path} is empty"

    # assert only folders in the first level
    assert all(os.path.isdir(os.path.join(path, subdir)) for subdir in os.listdir(path)), f"Not all items in {path} are directories"

    # assert that all first level subdirectories are nats and ordered
    check_numeric_children_consecutive(path)

    # go into folder titled path/0 and find the set of folder names
    zero_folder = os.path.join(path, "0")
    assert os.path.isdir(zero_folder), f"{zero_folder} is not a directory"

    folder_names = {name for name in os.listdir(zero_folder) if os.path.isdir(os.path.join(zero_folder, name))}

    # assert that set of folder_names is the same across all number directories
    for subdir in os.listdir(path):
        subdir_path = os.path.join(path, subdir)
        if os.path.isdir(subdir_path) and subdir != "0":
            subdir_folder_names = {name for name in os.listdir(subdir_path) if os.path.isdir(os.path.join(subdir_path, name))}
            assert subdir_folder_names == folder_names, f"Folder names in {subdir_path} do not match those in {zero_folder}"

    # assert that all subfolders have the same file names e.g. path/0/subfolder/[1..8].txt matches path/1/
    file_set = None
    for folder in os.listdir(path):
        folder_path = os.path.join(path, folder)
        for subdir in os.listdir(folder_path):
            assert os.path.isdir(os.path.join(folder_path, subdir)), f"{subdir} is not a directory"
            file_names = {name for name in os.listdir(os.path.join(folder_path, subdir)) if os.path.isfile(os.path.join(folder_path, subdir, name))}
            if file_set is None:
                file_set = file_names
            else:
                assert file_set == file_names, f"File names in {subdir} are not consistent with other subdirectories"

# Assumes you have a folder called proofs in the current working directory with the structure enforced by the check_structure function
check_structure("proofs")


In [8]:
from datasets import load_dataset
miniF2F_test_df = load_dataset("AI-MO/minif2f_test", split="train").to_pandas().head(40)

  from .autonotebook import tqdm as notebook_tqdm
Generating train split: 100%|██████████| 244/244 [00:00<00:00, 64007.14 examples/s]


In [9]:
import os
import json
from typing import Any, Union

def _read_text_file(path: str) -> str:
    try:
        with open(path, "r", encoding="utf-8") as f:
            return f.read()
    except UnicodeDecodeError:
        with open(path, "r", encoding="latin-1") as f:
            return f.read()

def build_structure(path: str) -> Union[dict[str, Any], str, None]:
    """
    Recursively walk `path`:
      - If it's a directory, return a dict of its children.
      - If it's a .txt file, return its text content.
      - Other files return None (and are skipped by the caller).
    """
    if os.path.isdir(path):
        out: dict[str, Any] = {}
        for entry in sorted(os.listdir(path)):
            full = os.path.join(path, entry)
            if os.path.isdir(full):
                out[entry] = build_structure(full)
            elif os.path.isfile(full) and entry.lower().endswith(".txt"):
                key = os.path.splitext(entry)[0]
                out[key] = _read_text_file(full)
        return out
    elif os.path.isfile(path) and path.lower().endswith(".txt"):
        return _read_text_file(path)
    else:
        return None  # non-txt file or missing; caller can ignore

structure = build_structure("proofs")
structure['0']['AI-MO_Kimina-Prover-Preview-Distill-7B']

{'1': '-- The area of the base of a cone is 30 square units, and its height is 6.5 units. What is the number of cubic units in its volume? Show that it is 65.\n  rw [h₁, h₂, h₃]\n  -- The volume of a cone is given by the formula $V = \\frac{1}{3}Bh$, where $B$ is the area of the base and $h$ is the height.\n  norm_num\n  -- The area of the base of a cone is 30 square units, and its height is 6.5 units. What is the number of cubic units in its volume? Show that it is 65.\n```',
 '2': '-- The area of the base of a cone is 30 square units, and its height is 6.5 units. What is the number of cubic units in its volume? Show that it is 65.\n  calc\n    -- The volume of a cone is given by the formula $V = \\frac{1}{3}Bh$, where $B$ is the area of the base and $h$ is the height.\n    v = 1 / 3 * (b * h) := by rw [h₁]\n    _ = 1 / 3 * (30 * 13 / 2) := by rw [h₂, h₃]\n    _ = 65 := by norm_num\n```',
 '3': "-- The area of the base of a cone is 30 square units, and its height is 6.5 units. What is

In [10]:
miniF2F_test_df.head(5)

Unnamed: 0,name,informal_prefix,formal_statement
0,mathd_algebra_478,/-- The volume of a cone is given by the formu...,import Mathlib\nimport Aesop\n\nset_option max...
1,numbertheory_4x3m7y3neq2003,/-- Show that there are no integers $x$ and $y...,import Mathlib\nimport Aesop\n\nset_option max...
2,aime_1983_p1,"/-- Let $x$, $y$ and $z$ all exceed $1$ and le...",import Mathlib\nimport Aesop\n\nset_option max...
3,amc12_2001_p5,/-- What is the product of all positive odd in...,import Mathlib\nimport Aesop\n\nset_option max...
4,mathd_algebra_141,/-- A rectangular patio has an area of $180$ s...,import Mathlib\nimport Aesop\n\nset_option max...


In [None]:
row1 = miniF2F_test_df.loc[0]

correct_proof_dict = [{
    'formal_statement': row1['formal_statement'],
    'proof': structure['0']['AI-MO_Kimina-Prover-Preview-Distill-7B'][str(i)],
    'project_dir': lean_project_path
} for i in range(1, 9)]

check_proofs_in_parallel(correct_proof_dict)

In [None]:
import os
import json
from bitarray import bitarray
from typing import Dict
from tqdm.auto import tqdm  # or: from tqdm import tqdm

def _load_checkpoint(path: str, num_problems: int) -> Dict[str, bitarray]:
    if not os.path.exists(path):
        return {}
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    model_dict: Dict[str, bitarray] = {}
    for model, bitstr in data.get("model_dict", {}).items():
        ba = bitarray(bitstr)
        if len(ba) < num_problems:
            ba.extend([False] * (num_problems - len(ba)))
        elif len(ba) > num_problems:
            ba = ba[:num_problems]
        model_dict[model] = ba
    return model_dict

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional, Callable

def check_proofs_in_parallel_output_arr(
    proof_contexts: List[dict],
    parallel_workers: int | None = None,
    on_progress: Optional[Callable[[int], None]] = None,
) -> List[int]:
    """
    Runs multiple proof checks in parallel (threaded).
    Returns a list of 1/0 aligned with the input order.
    Calls on_progress(1) each time an attempt completes.
    """
    assert isinstance(parallel_workers, int) and parallel_workers > 1, "parallel_workers must be > 1"
    if not proof_contexts:
        return []

    results: List[int] = [0] * len(proof_contexts)

    def _run(idx: int, ctx: dict) -> Tuple[int, int]:
        name = f"proof_{idx}"
        ok = check_lean_proof(ctx, name=name)
        return idx, 1 if ok else 0

    with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
        futures = {executor.submit(_run, i, ctx): i for i, ctx in enumerate(proof_contexts, start=1)}
        for future in as_completed(futures):
            idx = futures[future]
            try:
                i, val = future.result()
            except Exception:
                i, val = idx, 0
            results[i - 1] = val
            if on_progress:
                on_progress(1)

    return results

# Evaluation Loop

In [None]:
import os
import numpy as np
from tqdm.auto import tqdm  # progress bar

# If structure_json isn't built yet, do it now:
try:
    structure_json  # type: ignore[name-defined]
except NameError:
    structure_json = build_structure("proofs")

# Robustly find the checkpoint path (use your own if different)
checkpoint_path_candidates = [
    "checkpoint.json",
]

checkpoint_path = next((p for p in checkpoint_path_candidates if os.path.exists(p)), "checkpoint.json")

# Derive problem/model/attempt axes from the verified folder structure
problem_ids = sorted(structure_json.keys(), key=lambda x: int(x))
first_problem = problem_ids[0]
model_names = sorted(structure_json[first_problem].keys())
# assume consistent attempt ids across models/problems (enforced by your check_structure)
attempt_ids = sorted(structure_json[first_problem][model_names[0]].keys(), key=lambda x: int(x))

num_problems = len(problem_ids)
num_models = len(model_names)
num_attempts = len(attempt_ids)

# Load coarse per-problem success bits from checkpoint (pads/truncates to num_problems)
model_bits = _load_checkpoint(checkpoint_path, num_problems)  # Dict[str, bitarray-like]

# Initialize the 3D result tensor (problems × models × attempts) with zeros
results = np.zeros((num_problems, num_models, num_attempts), dtype=np.uint8)

# Choose a parallelism level (> 1 due to helper's assert)
parallel_workers = max(2, (os.cpu_count() or 2))

# Global progress bar over all (problem, model) pairs
with tqdm(total=num_models * num_problems, desc="Evaluating (problem, model) pairs", unit="pair") as pbar:
    for m_idx, model in enumerate(model_names):
        coarse = model_bits.get(model, None)
        if coarse is None:
            # Model missing from checkpoint → count all problems as processed (left zeros)
            pbar.update(num_problems)
            continue

        for p_axis_idx, p_str in enumerate(problem_ids):
            if not bool(coarse[p_axis_idx]):
                # Fast path: checkpoint says no success → keep zeros
                pbar.update(1)
                continue

            # Otherwise, verify each attempt for this (problem, model)
            attempts_dict = structure_json[p_str][model]  # keys like "1","2",...
            formal_statement = df.iloc[p_axis_idx]["formal_statement"]

            # Build proof contexts for the helper
            proof_contexts = []
            for a_str in attempt_ids:
                proof_text = attempts_dict.get(a_str, "")
                proof_contexts.append({
                    "formal_statement": formal_statement,
                    "proof": proof_text,
                    "project_dir": lean_project_path,
                    'metadata': {'proof_solver': model, 'problem_id': p_str, 'attempt_id': a_str}
                })

            # Run attempt checks via the threaded helper and fill the slice
            outcomes = check_proofs_in_parallel_output_arr(
                proof_contexts,
                parallel_workers=parallel_workers,
            )
            results[p_axis_idx, m_idx, :] = np.fromiter(outcomes, dtype=np.uint8, count=num_attempts)

            pbar.update(1)

print("results shape:", results.shape)  # (num_problems, num_models, num_attempts)

In [48]:
import json
from typing import List

def results_to_nested_dict(
    results,                      # numpy array, shape: (num_problems, num_models, num_attempts)
    model_names: List[str],
    problem_ids: List[str],
) -> Dict[str, Dict[str, list]]:
    """Return {model_id: {problem_id: [0/1, ...]}}."""
    num_problems, num_models, num_attempts = results.shape
    assert len(model_names) == num_models
    assert len(problem_ids) == num_problems

    payload: Dict[str, Dict[str, list]] = {}
    for m_idx, model in enumerate(model_names):
        per_model: Dict[str, list] = {}
        for p_idx, prob in enumerate(problem_ids):
            per_model[str(prob)] = results[p_idx, m_idx, :].astype(int).tolist()
        payload[str(model)] = per_model
    return payload

def save_results_nested_json(
    results,
    model_names: List[str],
    problem_ids: List[str],
    out_path: str = "proof_outcomes_by_model.json",
) -> str:
    payload = results_to_nested_dict(results, model_names, problem_ids)
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2, ensure_ascii=False)
    return out_path

# Example:
# save_results_nested_json(results, model_names, problem_ids, "proof_outcomes_by_model.json")


In [52]:
save_results_nested_json(results, model_names, problem_ids, "proof_outcomes_by_model.json")

'proof_outcomes_by_model.json'

In [50]:
def results_to_problem_nested_dict(
    results,                      # numpy array, shape: (num_problems, num_models, num_attempts)
    model_names: List[str],
    problem_ids: List[str],
) -> Dict[str, Dict[str, list]]:
    """
    Return a dict nested by problem, then model:

    {
      "<problem_id>": {
        "<model_id>": [0/1, 0/1, ...],
        ...
      },
      ...
    }
    """
    num_problems, num_models, _ = results.shape
    assert len(model_names) == num_models
    assert len(problem_ids) == num_problems

    payload: Dict[str, Dict[str, list]] = {}
    for p_idx, prob in enumerate(problem_ids):
        per_problem: Dict[str, list] = {}
        for m_idx, model in enumerate(model_names):
            per_problem[str(model)] = results[p_idx, m_idx, :].astype(int).tolist()
        payload[str(prob)] = per_problem
    return payload


def save_results_nested_by_problem_json(
    results,
    model_names: List[str],
    problem_ids: List[str],
    out_path: str = "proof_outcomes_by_problem.json",
) -> str:
    """
    Writes JSON nested by problem → model → outcomes.
    """
    payload = results_to_problem_nested_dict(results, model_names, problem_ids)
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2, ensure_ascii=False)
    return out_path


In [53]:
save_results_nested_by_problem_json(results, model_names, problem_ids, "proof_outcomes_by_problem.json")


'proof_outcomes_by_problem.json'