Note: code originally ran on Google Colab

To switch the dataset used, modify the df variable

You may also need to change the PROMPT_BUILDER variable to match your dataset columns

# Setup

In [None]:
!pip install --upgrade transformers
!pip install tqdm sentencepiece datasets

In [2]:
from typing import List, Tuple

In [None]:
from datasets import load_dataset

# Define the path to the folder you want to save to 
folder_path = '/content/drive/MyDrive/proofs_miniF2F-test'

# huggingface IDs
solver_model_ids = [
    "Goedel-LM/Goedel-Prover-SFT",
    "AI-MO/Kimina-Prover-Preview-Distill-7B",
    "deepseek-ai/DeepSeek-Prover-V2-7B",
]

## Use this code here for HaimingW/miniF2F-lean4, comment out for script-jpg/imo-1969-p2-lemmas
df = load_dataset("HaimingW/miniF2F-lean4", split="test").to_pandas()
PROMPT_BUILDER = lambda problem_row : f"{problem_row['header']}\n\n{problem_row['informal_prefix']}\n{problem_row['formal_statement']}\n"

# Save Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

# Create the folder path if it doesn't exist
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
    print(f"Folder '{folder_path}' created successfully.")
else:
    print(f"Folder '{folder_path}' already exists.")

# Helper functions for loading models and text generation

In [7]:
from transformers import pipeline, AutoConfig, AutoTokenizer, AutoModelForCausalLM
import logging
def _load_model(model_id):
    """
    Loads a single model and tokenizer to the GPU, with a fix for rope_scaling issues.
    """
    print(f"Attempting to load model: {model_id}")
    try:
        # 1. Load configuration first
        config = AutoConfig.from_pretrained(model_id, trust_remote_code=True)

        # 3. Load model and tokenizer with the (potentially corrected) config
        tok = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
        model = AutoModelForCausalLM.from_pretrained(
            model_id,
            config=config, # Pass the corrected config
            torch_dtype="auto",
            trust_remote_code=True
        ).to("cuda")
        print(f"Successfully loaded {model_id}")
        return model, tok
    except Exception as e:
        # Provide a more informative error message
        logging.error(f"❌ Failed to load model '{model_id}'. Error: {e}")
        # Return None to be handled by the calling function, preventing the crash
        return None, None

In [8]:
def generate_proof(pipe, prompt, max_new_tokens, num_return_sequences: int = 1):

    # Prepare arguments for the pipeline
    generation_args = {
        'do_sample': True,
        'eos_token_id': pipe.tokenizer.eos_token_id,
        'num_return_sequences': num_return_sequences
    }

    # Only add max_new_tokens if a value is provided
    # If it remains None, the pipeline will use its own default
    if max_new_tokens is not None:
        generation_args['max_new_tokens'] = max_new_tokens

    # Call the pipeline with the arguments
    out = pipe(prompt, **generation_args)
    proofs = [result['generated_text'][len(prompt):].strip() for result in out]
    return proofs

In [9]:
from pathlib import Path
import gc
import torch
from tqdm import tqdm
from transformers import pipeline

In [10]:
def _sanitize_dir_name(name: str) -> str:
    return str(name).replace("/", "_").replace("\\", "_")

In [11]:
def _next_index(out_dir: Path) -> int:
    out_dir.mkdir(parents=True, exist_ok=True)
    nums = []
    for p in out_dir.glob("*.txt"):
        stem = p.stem
        if stem.isdigit():
            nums.append(int(stem))
    return (max(nums) + 1) if nums else 1

In [12]:
def generate_proofs_memory_safe(
    model_ids,
    problem_row,
    problem_key,
    max_attempts: int,
    base_output_dir: str,
    gpu_batch_size: int = 8,
    clear = True,
    prompt_builder = None
):
    """
    Generate proofs and write to base_output_dir/<problem_key>/<model_id>/1.txt, 2.txt, ...
    No proof checking; purely generation + IO. Memory-safe (loads one model at a time).
    """
    for model_id in tqdm(model_ids, desc="Models"):
        model = tok = pipe = None
        attempt_bar = None
        try:
            model, tok = _load_model(model_id)
            pipe = pipeline("text-generation", model=model, tokenizer=tok, device=0)
            attempts_left = max_attempts
            attempt_bar = tqdm(total=max_attempts, desc=f"Generating {model_id}", leave=False)

            # Prepare output directory and next index (continues numbering if rerun)
            out_dir = Path(base_output_dir) / _sanitize_dir_name(problem_key) / _sanitize_dir_name(model_id) # Modified out_dir
            next_idx = _next_index(out_dir)
            print(next_idx)

            assert prompt_builder is not None, "prompt_builder must be provided"
            PROMPT = prompt_builder(problem_row)

            while attempts_left > 0:
                current_batch_size = min(gpu_batch_size, attempts_left)
                with torch.no_grad():
                    proof_snippets = generate_proof(
                        pipe,
                        prompt=PROMPT,
                        max_new_tokens=4096,
                        num_return_sequences=current_batch_size
                    )
                # Write each snippet to numbered files 1.txt, 2.txt, ...
                for snippet in proof_snippets:
                    out_path = out_dir / f"{next_idx}.txt"
                    with open(out_path, "w", encoding="utf-8") as f:
                        f.write(snippet)
                    next_idx += 1
                attempts_left -= current_batch_size
                attempt_bar.update(current_batch_size)
        finally:
          if clear:
            if attempt_bar is not None:
                attempt_bar.close()
            if model: del model
            if tok:   del tok
            if pipe:  del pipe
            gc.collect()
            torch.cuda.empty_cache()

In [13]:
def write_proofs_for_model(
    model_id: str,
    dataframe,
    base_output_dir: str, # Added base_output_dir
    max_attempts: int = 8,
    gpu_batch_size: int = 8,
    clear = True,
    prompt_builder = None
):
    """
    For each problem in `dataframe`, generate `max_attempts` proofs for `model_id`
    and write them to base_output_dir/<problem_key>/<model_id>/*.txt.
    `problem_key` defaults to the DataFrame index value.
    """
    print(f"--- Generating proofs for model: {model_id} ---")
    for idx, problem_row in tqdm(dataframe.iterrows(), total=len(dataframe), desc=f"Problems for {model_id}"):
        problem_key = problem_row.get('problem_id', idx)
        generate_proofs_memory_safe(
            model_ids=[model_id],
            problem_row=problem_row,
            problem_key=problem_key,
            max_attempts=max_attempts,
            base_output_dir=base_output_dir,
            gpu_batch_size=gpu_batch_size,
            clear = clear,
            prompt_builder = prompt_builder
        )
    print(f"--- Done for {model_id}. ---")

# Proof Generation

In [None]:
for i, mid in enumerate(solver_model_ids):
    try:
      write_proofs_for_model(mid, df, base_output_dir=folder_path, max_attempts=24, gpu_batch_size=4, clear=True, prompt_builder=PROMPT_BUILDER) # Passed folder_path
    except Exception as e:
      print(f"Error for {mid}: {e}")