In [1]:
from transformers import AutoTokenizer, AutoModelForCausalLM, TextGenerationPipeline
from datasets import Dataset
import torch
import pandas as pd
import numpy as np
import ast
import csv
import time
import re

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
tokenizer = AutoTokenizer.from_pretrained("DeepSeek-Prover-V2-7B")
model = AutoModelForCausalLM.from_pretrained("DeepSeek-Prover-V2-7B").cuda()
generator = TextGenerationPipeline(model=model, tokenizer=tokenizer, device=0)

`rope_scaling`'s factor field must be a float >= 1, got 16
`rope_scaling`'s beta_fast field must be a float, got 32
`rope_scaling`'s beta_slow field must be a float, got 1
Loading checkpoint shards: 100%|██████████| 2/2 [00:06<00:00,  3.27s/it]
Device set to use cuda:0


## Upload dataset

In [16]:
df = pd.read_csv('data/correct_incorrect_df.csv')
# датасет так составлен, что
# правильных и неправильных доказательств будет поровну
df = df[:1000]
df['answer'] = np.nan
df.head()

Unnamed: 0,data_id,proof,is_correct,proof_len,proof_part,answer
0,10137740601197937323927000320,['p1 p2 p3 p4 p5 : Prop\n⊢ ((False ∨ ((((p4 ∧ ...,True,7,['p1 p2 p3 p4 p5 : Prop\n⊢ ((False ∨ ((((p4 ∧ ...,
1,101901011237190232879400816388,['p1 p2 p3 p4 p5 : Prop\n⊢ ((((p5 → p1) → (((p...,False,8,['p1 p2 p3 p4 p5 : Prop\n⊢ ((((p5 → p1) → (((p...,
2,10147402779818504157948999715,['p1 p2 p3 p4 p5 : Prop\n⊢ ((False ∨ (p4 → p2 ...,True,9,['p1 p2 p3 p4 p5 : Prop\n⊢ ((False ∨ (p4 → p2 ...,
3,102028860579639362150235400604,['p1 p2 p3 p4 p5 : Prop\n⊢ ((((((((True ∧ p1) ...,False,11,['p1 p2 p3 p4 p5 : Prop\n⊢ ((((((((True ∧ p1) ...,
4,101865771099929866782122861458,['p1 p2 p3 p4 p5 : Prop\n⊢ (((p2 ∧ ((False → F...,True,8,['p1 p2 p3 p4 p5 : Prop\n⊢ (((p2 ∧ ((False → F...,


In [17]:
prompt_start = """You are a Lean 4 proof assistant.
Your task is to determine whether the given sequence of Lean 4 tactics represents a semantically correct beginning of a proof of the initial goal.\n\n"""

prompt_end = """\n
You must return a single JSON object, and nothing else, in the following format:

{ "verdict": "yes" }  
or  
{ "verdict": "no" }

Only use lowercase "yes" or "no" as the value of the field. Do not include explanations or any other fields. Do not explain your decision.
"""

In [18]:
def make_prompt(row):
    statement_list = ast.literal_eval(row["proof"])
    # отрезаем \n на конце, чтобы потом их не было два
    statement_list[0] = statement_list[0][:-1]
    input_text = prompt_start + "\n".join(statement_list) + prompt_end
    return input_text

In [19]:
df['input_text'] = df.apply(make_prompt, axis=1)

In [20]:
data = Dataset.from_pandas(df[['input_text']])

In [21]:
def extract_verdict_from_text(response: str, i: int) -> bool | float:
    """
    Parses a model response for a JSON object like {"verdict": "yes"} or {"verdict": "no"}.
    Returns:
        - True if verdict is "yes"
        - False if verdict is "no"
        - np.nan if no valid verdict is found
    """
    response = response[len(df.at[i, 'input_text']):]
    match = re.search(r'\{\s*"verdict"\s*:\s*"(?P<verdict>yes|no)"\s*\}', response.lower())
    if match:
        verdict_str = match.group("verdict").strip()
        return verdict_str == "yes"
    return np.nan

In [None]:
outputs = generator(data['input_text'], batch_size=4, max_new_tokens=1000, do_sample=False)

In [None]:
df['answer'] = [extract_verdict_from_text(out[0]['generated_text'], i) for i, out in enumerate(outputs)]
df.to_csv(f'results/self_consistency_1000_df.csv', index=False)

In [6]:
"""answers_list = []
for index, row in df.iterrows():
    if index % 5 == 0:
        print(f"Processed {index} / {len(df)}, time: {time.time()}")
        
    out = generator(input_text, max_new_tokens=1000, do_sample=False)[0]["generated_text"]
    answers_list.append(extract_verdict_from_text(out[len(input_text):]))
    
    if index == 200:
        df_index = df[:index]
        df_index['answer'] = answers_list
        df_index.to_csv(f'results/self_consistency_200_df.csv', index=False)

    if index >= 500 and index % 500 == 0:
        df_index = df[:index]
        df_index['answer'] = answers_list
        df_index.to_csv(f'results/self_consistency_{index}_df.csv', index=False)"""

Processed 0 / 10000, time: 1747143831.3287163
Processed 5 / 10000, time: 1747143898.1814132


You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


Processed 10 / 10000, time: 1747143967.4549475
Processed 15 / 10000, time: 1747144043.7339907


KeyboardInterrupt: 