In [None]:
!nvidia-smi

In [None]:
from unsloth import FastLanguageModel
import torch
import pandas as pd
import numpy as np
import random
import datasets
import re
import gc 
import argparse
import os
from vllm import LLM, SamplingParams
import evaluate
from statistics import mean
from evaluate import load
from unsloth.chat_templates import get_chat_template
from vllm import LLM, SamplingParams
from transformers import AutoModel, AutoTokenizer
pd.set_option('display.max_colwidth', None)  # None means unlimited width
# Load the BERTScore evaluation metric
bertscore = evaluate.load("bertscore")

MAX_SEQ_LENGTH = 1024 # Choose any! We auto support RoPE Scaling internally!
DTYPE = None # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
LOAD_IN_4BIT = False # Use 4bit quantization to reduce memory usage. Can be False.
DATA_PATH = "/cluster/home/pgoyal/main/test/cot_test.csv"
OUTPUT_PATH = "/cluster/project/sachan/piyushi/predictions_COT"
checkpoint_path = "/cluster/project/sachan/piyushi/merged_models_COT/checkpoint-360"
SEED = 42
random.seed(SEED)

### Preprocess data

In [None]:
def prepare_test_data(path=DATA_PATH, remove_test_duplicates=True):
    # Load test data
    df_test = pd.read_csv(path)
    df_test.rename(columns={'answer':'response'}, inplace=True)
    
    # Sort test set by 'id'
    df_test = df_test.sort_values(by=['id'])
    
    # Remove Duplicates for Test DF if necessary
    if remove_test_duplicates:
        print("Removing Test Duplicates")
        df_test = df_test.drop_duplicates(subset=['id'])
        df_test.reset_index(drop=True, inplace=True)
        print(df_test.shape)
        print(df_test.head())
    
    # Convert to Dataset
    dataset_test = datasets.Dataset.from_pandas(df_test[['id','question','response']].copy())
    
    # Create DatasetDict with only 'test'
    ds = datasets.DatasetDict({"test": dataset_test})
    
    print(ds)
    return ds

In [None]:
ds = prepare_test_data()

In [None]:
ds['test'][0]

### VLLM

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(SEED)

In [None]:
def extract_ans(step):
    pattern = r'<<.*?=(\d+)>>'
    match = re.search(pattern, step)
    
    if match:
        return float(match.group(1))
    
    # Fallback to original pattern
    number_pattern = r'[$]?[-+]?\d+(?:\.\d+)?(?:,\d+)*[$]?'
    matches = re.findall(number_pattern, step)
    
    if matches:
        cleaned_value = float(matches[-1].replace(",", "").replace(" ", "").replace("\n", "").replace("$", "").replace("x", ""))
        return cleaned_value
    
    return None

In [None]:
def split_into_steps(text):
    # Split the text by the newline character
    steps = text.split('\n')
    
    # Remove any empty strings and filter out steps that start with '####'
    steps = [step for step in steps if step.strip() and not step.strip().startswith('####')]
    
    return steps

In [None]:
questions = []
true_test_step = []
first_step_ans = []
answers = []

for item in ds['test']:
    questions.append(item['question'])
    steps = split_into_steps(item['response'])
    true_test_step.append(steps[0])
    first_step_ans.append(extract_ans(steps[0]))
    answers.append(item['response'])

In [None]:
df_test = pd.DataFrame({
    'question': questions,
    # 'answer': answers
    'correct_first_step': true_test_step,
    'gt_ans': first_step_ans
})

df_test.head()

In [None]:
def format_prompts_batch(checkpoint_path, questions, first_steps, flag):
    """
    Formats a batch of questions and first steps using ChatML template.

    Args:
        checkpoint_path (str): Path to the checkpoint of the model.
        questions (list of str): List of math word problems.
        first_steps (list of str): List of first steps for each question.

    Returns:
        list of torch.Tensor: List of formatted and tokenized prompts ready for input.
    """
    tokenizer = get_chat_template(
        AutoTokenizer.from_pretrained(checkpoint_path),  # Adjust this function to your needs
        chat_template="chatml",
        mapping={"role": "from", "content": "value", "user": "human", "assistant": "gemma"},
        map_eos_token=True
    )

    formatted_questions = []

    for question, first_step in zip(questions, first_steps):
        if flag:
            formatted_question = tokenizer.apply_chat_template(
                [{"from": "human", "value": f"### Instruction:\nCalculate only the first step for the following Math Word Problem\n\n### Input:\n{question}"}],
                tokenize=True,
                add_generation_prompt=True,
                return_tensors="pt"
            ).to("cuda")
        else:
            formatted_question = tokenizer.apply_chat_template(
                [
                    {"from": "human", "value": f"### Instruction:\nCalculate only the first step for the following Math Word Problem\n\n### Input:\n{question}"},
                    {"from": "gemma", "value": f"{first_step}"},
                    {"from": "human", "value": f"### Instruction:\nContinue generating the entire answer from the next step\n\n"}
                ],
                tokenize=True,
                add_generation_prompt=True,
                return_tensors="pt"
            ).to("cuda")
        formatted_questions.append(formatted_question)
    
    return formatted_questions, tokenizer

In [None]:
def generate_steps_batch(checkpoint_path, max_seq_len, formatted_questions, tokenizer):
    """
    Generates steps for a batch of math word problems using a pre-trained language model.

    Args:
        checkpoint_path (str): Path to the model checkpoint.
        max_seq_len (int): Maximum sequence length for token generation.
        formatted_questions (list of torch.Tensor): List of tokenized prompts for the batch.
        tokenizer (AutoTokenizer): The tokenizer used for decoding.

    Returns:
        list of str: List of generated texts for the batch.
    """
    # Load the model and sampling parameters
    llm = LLM(model=checkpoint_path)
    sampling_params = SamplingParams(temperature=0, max_tokens=max_seq_len, stop=["### Instruction", "### Input"])
    
    predictions = []
    decoded_questions = []
    
    for formatted_question in formatted_questions:
        token_ids = formatted_question.squeeze().tolist()
        decoded_question = tokenizer.decode(token_ids, skip_special_tokens=True)
        decoded_questions.append(decoded_question)
    # Generate outputs for each question
    outputs = llm.generate(decoded_question, sampling_params)
    
    for output in outputs:
        generated_text = output.outputs[0].text
        predictions.append(generated_text)
    
    # Clean up to free memory
    torch.cuda.empty_cache()
    del llm
    del sampling_params
    gc.collect()

    return predictions

In [None]:
flag = True

first_steps = [""] * len(questions)
first_step_prompts, tokenizer = format_prompts_batch(checkpoint_path, questions, first_steps, flag)
first_step_predictions = generate_steps_batch(checkpoint_path, MAX_SEQ_LENGTH, first_step_prompts, tokenizer)
df_test['pred_first_step'] = first_step_predictions
df_test.head()

In [None]:
true_rows = []

for idx, first_step in enumerate(df_test['pred_first_step']):
    ans = extract_ans(first_step)
    ans_right = df_test.loc[idx, 'gt_ans']
    if ans == ans_right:
        true_rows.append({
            'question': df_test.loc[idx, 'question'],
            'true_answer': ds['test'][idx]['response'],
            'pred_first_step': first_step,
            'final_true_answer': ds['test'][idx]['response'].split("####")[1].strip()
        })

# Create a new DataFrame with only the rows where the condition was true
df_correct = pd.DataFrame(true_rows)
df_correct.head()

In [None]:
flag = False
remaining_step_prompts, tokenizer = format_prompts_batch(checkpoint_path, df_correct['question'].tolist(), df_correct['pred_first_step'].tolist(), flag)
remaining_step_predictions = generate_steps_batch(checkpoint_path, MAX_SEQ_LENGTH, remaining_step_prompts, tokenizer)
df_correct['remaining_steps'] = remaining_step_predictions
df_correct.head()

In [None]:
def get_final_ans(step):
    # Find all matches of the pattern <<...=value>> and extract the last one
    pattern = r'<<.*?=(\d+(?:\.\d+)?)>>'
    matches = re.findall(pattern, step)
    
    if matches:
        # Return the last match (the answer from the last step)
        return float(matches[-1])
    
    return None

In [None]:
count = 0
total = 0
pattern = r'## Final Answer: (\d+(?:\.\d+)?)'
for index, row in df_correct.iterrows(): 
    # pred_final = extract_ans(row['remaining_steps_answer']) 
    steps = row['remaining_steps']
    if "## Final Answer: " in steps:
        total += 1
        pred_final = steps.split("## Final Answer:")[1].strip()
        gt_final = df_correct.loc[index, 'final_true_answer']
        # print(gt_final)
        if pred_final == gt_final: 
            count += 1
print(total, count / total)

In [None]:
df_test.to_pickle('dftest.pkl')

In [None]:
df_test = pd.read_pickle('dftest.pkl')

In [None]:
df_test.head()

In [None]:
cnt=0
tot=0
for idx, first_step in enumerate(df_test['pred_first_step']):
    tot+=1
    ans = extract_ans(first_step)
    ans_right = df_test.loc[idx, 'gt_ans']
    if ans == ans_right:
        cnt += 1
print(tot, cnt/tot)

In [None]:
def first_and_final(index, text):
    first=final=""
    first = text.split("\n")[0].strip()
    if "####" in text:
        final = text.split("####")[1].strip()
    else: print(index, text)
    return first, final

count_first = 0
tot_first = 0
count_final = 0
tot_final = 0

for index, row in df_test.iterrows(): 
    ans = row['pred_first_step']
    if "### Answer:\n" in ans:
        tot_first += 1
        only_ans = ans.split("### Answer:\n")[1].strip()
        pred_first, pred_final = first_and_final(index, only_ans)
        gt_ans = row['answer']
        gt_first, gt_final = first_and_final(index, gt_ans)
        pred_first_ans = get_final_ans(pred_first)
        gt_first_ans = get_final_ans(gt_first)
        if gt_first_ans == pred_first_ans:
            count_first += 1
            if pred_final != "":
                tot_final += 1
                if gt_final == pred_final:
                    count_final += 1
        # gt_ans = row['answer']
    # gt_first, gt_final = first_and_final(index, gt_ans)
    # pred_first_ans = get_final_ans(pred_first)
    # gt_first_ans = get_final_ans(gt_first)
    # if gt_first_ans == pred_first_ans:
    #     count_first += 1
    # if pred_final != "" and gt_final == pred_final:
    #     count_final += 1
print(tot_first, tot_final)
print(f"First answer accuracy = {count_first/tot_first}")
print(f"Final answer accuracy = {count_final/tot_final}")