# Zero-shot MMOS-DeepSeekMath-7B with self-consistency and generated code reasoning evaluation

Self-consistency is a modification of the standard greedy decoding in reasoning pipelines via sampling several diverse answers followed by aggregation, e.g., most common answer ([SC-CoT paper](https://arxiv.org/pdf/2203.11171.pdf)).

In this kernel, we will consider MMOS-DeepSeekMath-7B RL-tuned backbone; in my experiments, this model produces more consistent code reasoning and the code block execution will allow us to decrease arithmetic hallucinations.

## References

- https://www.kaggle.com/code/ironbar/autobots-roll-out/notebook
- https://www.kaggle.com/code/abdurrafae/improved-code-interpretation
- https://kaggle.com/code/xiaoz259/pure-rng/notebook
- https://www.kaggle.com/code/olyatsimboy/aimo-openmath-mistral-baseline
- https://www.kaggle.com/code/aatiffraz/prompt-prediction-w-mixtral-mistral7b-gemma-llama
- https://www.kaggle.com/code/thedrcat/aimo-mixtral-baseline

## Configuration

In [None]:
class CFG:
    # Data parameters
    quick_save = False # If true it will set the time limit to 1 so the saving of the notebook is really quick
    submission_mode = False # If True it will use aimo.env otherwise a mock environment
    ## Data parameters only used when submission_mode is False
    # dataset = '/kaggle/input/ai-mathematical-olympiad-prize/train.csv'
    # dataset = '/kaggle/input/filtered-math/filtered_MATH_test_5.csv' # filepath, ignored if submission_mode is True
    dataset = '/mnt/hdd0/Kaggle/aimo/external_data/filtered_MATH_test_5.csv'
    problem_indices = None # list(range(580))[::11] # If not None will restrict the evaluation to the given problem idx of the dataset
    # Model parameters
    # model_path = "/kaggle/input/deepseek-math"
    model_path = "/home/gbarbadillo/data/deepseekmath"
    use_4bit_quantization = False
    balanced_device_map = True
    cuda_visible_devices = 0 # If not None could be used to limit the use of GPUS
    context_window_size = 4096
    # Run parameters
    time_limit = 31500 # seconds, 31500 by default which is 8.75 hours
    verbose = True
    save_results = True
    result_priority = ['code_answer', 'text_answer'] #['code_answer', 'boxed_answer', 'text_answer'] # Select which answers will be used as result
    # few-shot parameters
    # few_shot_dataset = '/kaggle/input/filtered-math/AIMO_train_with_solutions.csv'
    few_shot_dataset = '/mnt/hdd0/Kaggle/aimo/data/AIMO_train_with_solutions.csv'
    few_shot_samples = 2
    max_sample_tokens = 512 # problems with more than this tokens will be filtered
    max_prompt_tokens = 1024 # 3072 # only prompts with less than this tokens will be used
    difficulty_levels = None # levels outside this range won't be used
    # Inference parameters
    confidence_level = 0.95 # this will be used to stop sampling solutions if the difference between the first and second most voted options is significative
    n_repetitions = 1
    random_seed = None # None or int
    max_new_tokens = 640 #2048
    max_coding_errors = 2
    code_output_truncate_length = 125 # max number of output parameters
    default_answer = 0 # this will be the response when the system does not have a valid answer
    stop_words = ["```output", "```python", "```\nOutput" , ")\n```" , "``````output", 'Problem:', 'User:']
    # https://community.openai.com/t/cheat-sheet-mastering-temperature-and-top-p-in-chatgpt-api/172683
    # temperature for text generation
    temperature_text = 0.9
    top_p_text = 0.9
    # temperature for coding generation
    temperature_code = 0.9
    top_p_code = 0.9

## Imports

In [None]:
import logging

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
import time
NOTEBOOK_START_TIME = time.time()

if CFG.use_4bit_quantization:
    !pip install -U /kaggle/input/accelerate-wheelwhl/accelerate-0.29.1-py3-none-any.whl -qq
    !pip install -U /kaggle/input/bitsandbytes-0-42-0-py3-none-any-whl/bitsandbytes-0.42.0-py3-none-any.whl -qq

import os
if CFG.cuda_visible_devices is not None:
    os.environ["CUDA_VISIBLE_DEVICES"] = str(CFG.cuda_visible_devices)
    
import sys
import subprocess
from IPython.display import display, Markdown
import pandas as pd
from tqdm.auto import tqdm
import torch
import gc
import re
import math
import random
import json
from collections import Counter
import numpy as np
import tempfile
from pydantic import BaseModel
from typing import Optional
import datetime
from scipy.stats import norm

# https://pytorch.org/docs/stable/backends.html#torch.backends.cuda.enable_mem_efficient_sdp
# Enables or disables memory efficient scaled dot product attention.
# If set to True I get this error: RuntimeError: cutlassF: no kernel found to launch!
if any(gpu in torch.cuda.get_device_properties(0).name for gpu in ['P100', 'T4']): # 'NVIDIA GeForce RTX 3090', 'Tesla P100-PCIE-16GB', 'Tesla T4'
    logging.info('Disabling torch cuda mem efficient sdp')
    torch.backends.cuda.enable_mem_efficient_sdp(False)

from transformers import (
    AutoModelForCausalLM, 
    AutoTokenizer, 
    AutoConfig,
    StoppingCriteria,
    StoppingCriteriaList,
    set_seed
)

import transformers
print(f"Transformers Version: {transformers.__version__}")
if CFG.random_seed is not None:
    set_seed(CFG.random_seed)

import matplotlib.pyplot as plt
import matplotlib as mpl

from openai import OpenAI
import threading
import subprocess
import requests

plt.plot()
plt.close('all')
plt.rcParams["figure.figsize"] = (20, 5)
mpl.rcParams['lines.linewidth'] = 3
mpl.rcParams['font.size'] = 16
logging.info('Imported all libraries.')

## Code

### Load data

In [None]:
class MockEnvWithDataframe:
    """
    This class has the same interface as aimo.env, thus you can reuse the same code
    for making submissions or evaluating other datasets
    """
    def __init__(self, df):
        """
        Initializes the mock environment with a dataframe containing problems.
        """
        self.df = df
        self.submissions = []

    def iter_test(self):
        """
        Simulates the iter_test function by yielding each problem with an accompanying sample_submission.
        """
        for _, row in self.df.iterrows():
            problem = pd.DataFrame([row])
            sample_submission = pd.DataFrame({'id': problem.id, 'answer': [None]})
            yield problem, sample_submission

    def predict(self, sample_submission):
        self.submissions.append(sample_submission)
        
    def get_all_submissions(self):
        return pd.concat(self.submissions)

In [None]:
if CFG.submission_mode:
    import aimo
    env = aimo.make_env()
    if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
        N_PROBLEMS = 50
    else:
        N_PROBLEMS = 3
else:
    df = pd.read_csv(CFG.dataset)
    if CFG.problem_indices is not None:
        df = df.iloc[CFG.problem_indices].reset_index(drop=True)
    if 'answer' in df.columns:
        df['ground_truth'] = df['answer']
    elif CFG.dataset == '/kaggle/input/ai-mathematical-olympiad-prize/test.csv': 
        df['ground_truth'] = 0
    N_PROBLEMS = len(df)
    display(df)
    env = MockEnvWithDataframe(df)
iter_test = env.iter_test()

### Model

In [None]:
def get_tokenizer(model_path):
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    tokenizer.pad_token_id = tokenizer.eos_token_id
    return tokenizer

tokenizer = get_tokenizer(CFG.model_path)

In [None]:
class TextGenerator():
    """
    Abstraction that allows to generate text and code in different steps efficiently
    """
    def __init__(self, cfg):
        self.cfg = cfg
        self.reset()

    def reset(self):
        self.prompt_tokens = 0
        self.generated_tokens = 0
        self.past_key_values = None
        self.set_generation_mode('text')
        self.max_new_tokens = self.cfg.max_new_tokens

    def set_generation_mode(self, mode):
        if mode == 'text':
            self.set_sampling_parameters(self.cfg.temperature_text, self.cfg.top_p_text)
        elif mode == 'code':
            self.set_sampling_parameters(self.cfg.temperature_code, self.cfg.top_p_code)
        else:
            raise KeyError(mode)

    def set_sampling_parameters(self, temperature, top_p):
        if temperature == 0:
            self.sampling_parameters = dict(do_sample=False)
        else:
            self.sampling_parameters = dict(do_sample=True, temperature=temperature, top_p=top_p)

    def are_generation_tokens_available(self):
        return self.generated_tokens < self.max_new_tokens

    def verify_max_new_tokens(self):
        if self.max_new_tokens > self.cfg.context_window_size - self.prompt_tokens:
            self.max_new_tokens = self.cfg.context_window_size - self.prompt_tokens
            logging.warning(f'Reducing max_new_tokens to {self.max_new_tokens} to avoid exceeding the context window of {self.cfg.context_window_size}')

    def generate(self, prompt, mode='text'):
        self.set_generation_mode(mode)
        model_inputs = tokenizer(prompt, return_tensors='pt').to(model.device)
        if self.prompt_tokens == 0:
            self.prompt_tokens = len(model_inputs['input_ids'][0])
            logging.info(f'Prompt has {self.prompt_tokens} tokens.')
            self.verify_max_new_tokens()
        self.generated_tokens = len(model_inputs['input_ids'][0]) - self.prompt_tokens
        if not self.are_generation_tokens_available():
            logging.warning(f'Input text exceeded the available generation tokens. This is likely happening because a big code output.')
            return prompt

        t0 = time.time()
        clear_memory()
        optional_past_key_values = dict(past_key_values=self.past_key_values) if self.past_key_values is not None else {}
        generation_output = model.generate(
            **model_inputs,
            max_new_tokens=self.max_new_tokens - self.generated_tokens,
            return_dict_in_generate=True,
            num_return_sequences=1,
            stopping_criteria=stopping_criteria,
            pad_token_id=tokenizer.eos_token_id,
            **self.sampling_parameters,
            **optional_past_key_values,
            )
        output_ids = generation_output.sequences[0]
        newly_generated_tokens = len(output_ids) - len(model_inputs['input_ids'][0])
        self.generated_tokens = len(output_ids) - self.prompt_tokens
        logging.info(f'Generating {mode} speed: {newly_generated_tokens/(time.time() - t0):.1f} tokens/s ({newly_generated_tokens}) ({self.generated_tokens}/{self.max_new_tokens})')
        self.past_key_values = generation_output.past_key_values
        decoded_output = tokenizer.decode(output_ids, skip_special_tokens=True)
        return decoded_output

    def __call__(self, prompt, mode):
        return self.generate(prompt, mode)

In [None]:
class TextGeneratorVLLM(TextGenerator):
    def __init__(self, cfg):
        super().__init__(cfg)
        self.reset()
        self.client = OpenAI(api_key='EMPTY', base_url="http://localhost:8000/v1")

    def set_sampling_parameters(self, temperature, top_p):
        self.sampling_parameters = dict(temperature=temperature, top_p=top_p)

    def generate(self, prompt, mode='text'):
        self.set_generation_mode(mode)
        if not self.are_generation_tokens_available():
            logging.warning(f'Input text exceeded the available generation tokens. This is likely happening because a big code output.')
            return prompt

        t0 = time.time()
        clear_memory()
        completion = self.client.completions.create(
            model=self.cfg.model_path,
            prompt=prompt,
            max_tokens=self.max_new_tokens - self.generated_tokens,
            **self.sampling_parameters,
            echo=True,
            stop=self.cfg.stop_words,
        )
        decoded_output = completion.choices[0].text
        if completion.choices[0].finish_reason == 'stop' and completion.choices[0].stop_reason is not None:
            decoded_output += completion.choices[0].stop_reason

        if self.prompt_tokens == 0:
            logging.info(f'Prompt has {self.prompt_tokens} tokens.')
            self.prompt_tokens = completion.usage.prompt_tokens
            self.verify_max_new_tokens()
        newly_generated_tokens = completion.usage.completion_tokens
        self.generated_tokens = completion.usage.total_tokens - self.prompt_tokens
        logging.info(f'Generating {mode} speed: {newly_generated_tokens/(time.time() - t0):.1f} tokens/s ({newly_generated_tokens}) ({self.generated_tokens}/{self.max_new_tokens})')
        return decoded_output

In [None]:
def log_gpu_memory():
    for device in range(torch.cuda.device_count()):
        logging.info(f'GPU {device} memory allocated: {torch.cuda.memory_allocated(device)/1024**3:.1f} GB, max memory allocated: {torch.cuda.max_memory_allocated(device)/1024**3:.1f} GB')

def empty_gpu_vram():
    logging.info('Emptying GPU VRAM...')
    global tokenizer
    variables_to_delete = ['tokenizer']
    for variable_name in variables_to_delete:
        if variable_name in globals():
            del globals()[variable_name]
    gc.collect()
    gc.collect()
    torch.cuda.empty_cache()
    log_gpu_memory()

log_gpu_memory()

In [None]:
def run_vllm_server():
    # https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html
    subprocess.run(['python', '-m', 'vllm.entrypoints.openai.api_server',
                    '--model', CFG.model_path,
                    '--uvicorn-log-level', 'error', # this is not working as intended
                    ])


def is_server_running(server_url):
    try:
        response = requests.get(server_url)
        return response.status_code == 200
    except requests.exceptions.ConnectionError:
        return False


def wait_for_server(server_url):
    while not is_server_running(server_url):
        time.sleep(1)
    logging.info(f"Server is running at {server_url}!")


def create_model_and_inference_artifacts():
    global server_thread
    if 'server_thread' in globals():
        return

    server_thread = threading.Thread(target=run_vllm_server)
    logging.info('Starting VLLM server...')
    server_thread.start()
    wait_for_server("http://localhost:8000/v1/models")
    log_gpu_memory()

In [None]:
def is_gpu_memory_available(required_memory=14):
    for device in range(torch.cuda.device_count()):
        available_memory = torch.cuda.mem_get_info(device)[0]/1024**3
        logging.info(f'Available memory on GPU {device} is {available_memory:.1f} GB')
        if available_memory < required_memory:
            return False
    return True

def wait_for_gpu_memory(wait_time=60, required_memory=14):
    while not is_gpu_memory_available(required_memory):
        logging.info(f'Waiting for GPU memory to be available...')
        time.sleep(wait_time)
    logging.info(f'GPU memory is available. Let\'s go training!')

is_gpu_memory_available()

### Prompts

#### Define problems

In [None]:
prompts_df = pd.read_csv(CFG.few_shot_dataset)
prompts_df.head()

In [None]:
logging.info(f'The number of problems for few-shot prompting is {len(prompts_df)} previous to filtering')
logging.info(f'Filtering problems longer than {CFG.max_sample_tokens} tokens and outside levels {CFG.difficulty_levels}')
prompts_df = prompts_df[prompts_df.total_tokens < CFG.max_sample_tokens]
if CFG.difficulty_levels is not None:
    prompts_df = prompts_df[prompts_df.level.isin([f'Level {i}' for i in CFG.difficulty_levels])]
prompts_df.reset_index(drop=True, inplace=True)
logging.info(f'The number of problems for few-shot prompting is {len(prompts_df)} after filtering')

#### Create few shot prompts

In [None]:
# https://github.com/deepseek-ai/DeepSeek-Math/tree/main
few_shot_prompt_templates = [
"""
User: QUESTION_PLACEHOLDER
Please integrate natural language reasoning with programs to solve the problem above, and put your final answer within \\boxed{}. The answer is a non negative integer.

Assistant: Sure, we can solve the problem by writing a Python script.

ANSWER_PLACEHOLDER
""",
"""
User: QUESTION_PLACEHOLDER
Please reason step by step, and put your final answer within \\boxed{}. The answer is a non negative integer.

Assistant: Sure, we can solve the problem by writing a Python program.

ANSWER_PLACEHOLDER
""",
"""
Problem:

QUESTION_PLACEHOLDER

You are an expert mathematical programmer. Solve the above mathematical problem by writing a Python program.
Express your answer as a numeric type or a SymPy object. The answer must be an integer greater or equal to zero.
Please reason step by step, and always end with "The answer is $\\boxed{}$".

ANSWER_PLACEHOLDER
"""
]


def create_random_few_shot_prompt(n=CFG.few_shot_samples, template_idx=0):
    prompt_template = few_shot_prompt_templates[template_idx % len(few_shot_prompt_templates)]
    prompt = ''
    problem_indices = np.random.choice(np.arange(len(prompts_df)), n, replace=False)
    for problem_idx in problem_indices:
        row = prompts_df.loc[problem_idx]
        prompt += prompt_template.replace('QUESTION_PLACEHOLDER', row['problem']).replace('ANSWER_PLACEHOLDER', row['solution'])
        #prompt += f'\nThe final answer is $\\boxed{{{row["answer"]}}}$\n'
    prompt += prompt_template.replace('QUESTION_PLACEHOLDER', 'PROBLEM_PLACEHOLDER').replace('ANSWER_PLACEHOLDER', '```python')
    return prompt.strip()

def create_random_few_shot_prompt_with_token_limit(token_limit=CFG.max_prompt_tokens, template_idx=0):
    while 1:
        prompt = create_random_few_shot_prompt(template_idx=template_idx)
        if len(tokenizer.tokenize(prompt)) < token_limit:
            return prompt


prompt = create_random_few_shot_prompt_with_token_limit()
print(f'Number of tokens in prompt: {len(tokenizer.tokenize(prompt))}')

In [None]:
%%time
print('Create some random prompts to see token length distribution')
[len(tokenizer.tokenize(create_random_few_shot_prompt_with_token_limit())) for _ in range(10)]

#### Simple prompts

In [None]:
prompt_options = [
"""Below is a math problem you are to solve (non negative integer answer):

\"PROBLEM_PLACEHOLDER\"

To accomplish this, first determine a sympy-based approach for solving the problem by listing each step to take and what functions need to be called in each step. Be clear so even an idiot can follow your instructions, and remember, your final answer should be a non negative integer, not an algebraic expression!
Write the entire script covering all the steps (use comments and document it well) and print the result. After solving the problem, output the final numerical answer within \\boxed{}.

Approach:

```python""",
"""Below is a math problem you are to solve (non negative integer answer):

\"PROBLEM_PLACEHOLDER\"

Analyze this problem and think step by step to come to a solution with programs. After solving the problem, output the final numerical answer within \\boxed{}.

```python""",
"""User: PROBLEM_PLACEHOLDER
Please reason step by step, and put your final answer within \\boxed{}. The answer is a non negative integer.

Assistant: Sure, we can solve the problem by writing a Python script.

```python""",
"""User: PROBLEM_PLACEHOLDER
Please integrate natural language reasoning with programs to solve the problem above, and put your final answer within \\boxed{}. The answer is a non negative integer.

Assistant: Sure, we can solve the problem by writing a Python program.

```python""",
"""You are an expert mathematical programmer. Solve the mathematical problem below by writing a Python program.

- Express your answer as a numeric type or a sympy object. The answer must be an integer greater or equal to zero.
- Please reason step by step, and write clean and readable code.
- You can use python libraries such as sympy, math or numpy to solve the problem.
- Finally always end with "The answer is $\\boxed{}$".

PROBLEM_PLACEHOLDER

```python""",
"""
User: PROBLEM_PLACEHOLDER
Please reason step by step, and put your final answer within \\boxed{}. The answer is a non negative integer.
Use all the available information in the problem description, and be very careful with the assumptions and simplifications you make.
You might use python libraries such as sympy, math, scipy or numpy to solve the problem, use the right tool.
Use code even for the simpler calculations to avoid mistakes.

Assistant: Sure, we can solve the problem by writing a Python program.

```python""",
]

print(len(prompt_options))

#### Prompt examples

In [None]:
def get_formatted_prompt(problem, repetition_idx):
    prompt_idx = (repetition_idx + len(problem))% (len(prompt_options) + len(few_shot_prompt_templates))
    if prompt_idx < len(prompt_options):
        prompt = prompt_options[prompt_idx]
    else:
        prompt = create_random_few_shot_prompt_with_token_limit(template_idx=repetition_idx)
    prompt = prompt.replace('PROBLEM_PLACEHOLDER', problem)
    return prompt

In [None]:
for idx in range(len(prompt_options) + len(few_shot_prompt_templates)):
    display(Markdown(get_formatted_prompt('What is $1 + 10$?', idx)))
    display(Markdown('---'))

### Utils

In [None]:
def clear_memory():
    for _ in range(2):
        torch.cuda.empty_cache()
        gc.collect()
        time.sleep(0.01)

In [None]:
def is_ending_time(max_time=CFG.time_limit):
    is_ending_time = get_time_spent() > max_time
    if is_ending_time:
        logging.warning('Reached limit time, inference will be skipped.')
    return is_ending_time

def get_time_spent():
    return time.time() - NOTEBOOK_START_TIME

assert not is_ending_time(100)
assert is_ending_time(0)

In [None]:
def is_quick_save_condition(idx, test):
    if CFG.quick_save and idx == 0 and CFG.submission_mode:
        if test['id'].values[0] == '000aaa':
            if test['problem'].values[0] == 'What is $1-1$?':
                logging.info('Quick save condition reached. Skipping inference')
                return True
    return False

In [None]:
def get_timestamp():
    return datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")

print(get_timestamp())

In [None]:
N_REPETITIONS = CFG.n_repetitions
PROBLEM_REPETITIONS = []

def adjust_repetitions_to_meet_ending_time(answered_problems,
                                           max_time=CFG.time_limit,
                                           min_problem_threshold=20,
                                           hysteresis=0.975):
    global N_REPETITIONS, PROBLEM_REPETITIONS
    PROBLEM_REPETITIONS.append(N_REPETITIONS)
    if answered_problems < min_problem_threshold:
        return
    spent_time = get_time_spent()
    mean_problem_time = spent_time/sum(PROBLEM_REPETITIONS)
    estimated_ending_time = (N_PROBLEMS - answered_problems)*mean_problem_time*N_REPETITIONS + spent_time
    logging.info(f'Mean problem time: {mean_problem_time:.1f} seconds, estimated ending time {estimated_ending_time/3600:.1f} hours')
    if estimated_ending_time > max_time and N_REPETITIONS > 1:
        N_REPETITIONS -= 1
        logging.warning(f'Decreasing the number of repetitions to {N_REPETITIONS} to try to meet ending time')
    elif estimated_ending_time < max_time*hysteresis and N_REPETITIONS < CFG.n_repetitions:
        N_REPETITIONS += 1
        logging.warning(f'Increasing the number of repetitions to {N_REPETITIONS} because it seems to be enough time to meet the ending time')

### Response parsing

In [None]:
def text_to_int_answer(text):
    try:
        answer = float(text)
        if answer < 0 or not answer.is_integer():
            return None
        return int(answer)
    except (ValueError, OverflowError):
        return None

assert 5 == text_to_int_answer('5')
assert 5 == text_to_int_answer('5.0')
assert text_to_int_answer('-1') is None
assert text_to_int_answer('0.5') is None
assert text_to_int_answer('pi') is None

In [None]:
def parse_boxed_answer(text):
    matches = re.findall(r'\\boxed\{(\d+)\}', text)
    if matches:
        return text_to_int_answer(matches[-1])
    return None

test_text = """

blah blah \\boxed{5} 7
"""
assert parse_boxed_answer(test_text) == 5

test_text = """

blah blah {5} 7
"""
assert parse_boxed_answer(test_text) == None

In [None]:
def parse_response_in_text(text):
    response = parse_boxed_answer(text)
    if response is not None:
        return response
    return parse_last_answer(text)

def parse_last_answer(text):
    pattern = r'(?:the answer is|the final answer is)\s*:?\s*\$?(\d+(\.\d+)?)\$?'
    matches = re.findall(pattern, text, re.IGNORECASE)
    if matches:
        return text_to_int_answer(matches[-1][0])
    return None

test_cases = [
    ('The answer is: $651$', 651),
    ('The answer is: $5$.', 5),
    ('The answer is: 6.', 6),
    ('The final answer is 0.', 0),
    ('The final answer is 126.', 126),
    ('The final answer is: $2$.', 2),
    ('The answer is $\\boxed{3}$', 3),
    ('The answer is $\\boxed{-1}$', None),
    ('The answer is $\\boxed{1.5}$', None),
    ('The answer is: $-1$.', None),
    ('The answer is: $4.5$.', None),
    ('The final answer is 0.6', None),
]
for text, answer in test_cases:
    assert parse_response_in_text(text) == answer
    assert parse_response_in_text(text.lower()) == answer

In [None]:
def parse_response_in_code(code_output):
    if code_output is None:
        return None
    try:
        code_output = code_output.strip()
        if code_output.startswith('[') and code_output.endswith(']'):
            return text_to_int_answer(code_output[1:-1])
        return text_to_int_answer(code_output)
    except Exception as e:
        print(f'Exception when trying to get a response from code: {e}')
        return None
    
assert parse_response_in_code('0') == 0
assert parse_response_in_code('[0]') == 0

### Code interpreter

In [None]:
def code_interpreter(code):
    code = preprocess_code(code)
    output, run_success = execute_code(code)
    return output, run_success

def preprocess_code(code):
    code = ensure_symbols_are_real(code)
    code = add_simplify_to_print(code)
    code = f'from sympy import *\n{code}'
    return code

def add_simplify_to_print(code):
    code = code.replace('print(', 'simplify_print(')
    new_code = """
def simplify_print(x):
    print(recursive_simplify(x))
        
def recursive_simplify(x):
    if isinstance(x, list):
        return [recursive_simplify(y) for y in x]
    return simplify(x)
"""
    code = new_code + '\n' + code
    return code

def ensure_symbols_are_real(code):
    def replace_symbols_call(match):
        matched_text = match.group()
        if "real" not in matched_text:
            return f"{matched_text[:-1]}, real=True)"
        else:
            return matched_text
    code = re.sub(r"symbols\([^)]+\)", replace_symbols_call, code)
    return code

assert ensure_symbols_are_real("x, y, z = symbols('x y z')") == "x, y, z = symbols('x y z', real=True)"
assert ensure_symbols_are_real("x, y, z = symbols('x y z', real=True)") == "x, y, z = symbols('x y z', real=True)"

def execute_code(code, timeout_limit=7):
    with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file:
        temp_file.write(code)
        temp_filepath = temp_file.name
    cmd = f'timeout {timeout_limit} {sys.executable} {temp_filepath}'
    ret = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    os.remove(temp_filepath)
    if ret.returncode == 0:
        return truncate_text(get_last_line(ret.stdout)), True
    elif ret.returncode == 124:
        return f'The execution of the code timeout. The code needs to run in less than {timeout_limit} seconds.', False
    else:
        #output = remove_references_to_temp_code_file(ret.stderr, temp_filepath)
        output = truncate_text(get_last_line(ret.stderr))
        return output, False
    
def remove_references_to_temp_code_file(output, filepath):
    return output.replace(f'File "{filepath}", ', '')

def get_last_line(text):
    lines = text.strip().splitlines()
    if lines:
        return lines[-1]
    return text.strip()

def truncate_text(text, max_length=CFG.code_output_truncate_length):
    """Sometimes code output can be very long"""
    if len(text) > max_length:
        return text[:max_length] + '...'
    return text

test_code = """
print('Hello')
"""
print(code_interpreter('print(0)'))
print(code_interpreter('foo'))

In [None]:
test_code = """
from sympy import symbols, Eq, solve

def solve_equation():
    x = symbols('x')
    equation = Eq(4 + x, 4)
    solution = solve(equation, x)

    return solution

result = solve_equation()
print(result)
"""
print(code_interpreter(test_code))

test_code = """
from sympy import symbols, Eq, solve

def solve_equation():
    x = symbols('x', real=True)
    equation = Eq(4 + x, 4)
    solution = solve(equation, x)

    return solution

result = solve_equation()
print(result)
"""
print(code_interpreter(test_code))

In [None]:
def parse_last_python_code_block(text):
    return text.split('```python')[-1].split("```")[0]

test_text = """
```python
hello
``````output
"""
assert parse_last_python_code_block(test_text) == '\nhello\n'

test_text = """
```python
hello
```
"""
assert parse_last_python_code_block(test_text) == '\nhello\n'

In [None]:
def add_code_output_to_prompt(decoded_output, code_output):
    if decoded_output.endswith(")\n```"):
        prompt = decoded_output+'```output\n'+str(code_output)+'\n```\n'
    else:
        prompt = decoded_output+'\n'+str(code_output)+'\n```\n'
    return prompt

In [None]:
class CodeRunner():
    """
    Abstraction to run code that:
    
    - Accumulates the code if the runs are succesfull
    - Measures number of coding errors
    """
    def __init__(self, max_coding_errors=2):
        self.accumulated_code = ''
        self.n_coding_errors = 0
        self.successful_code_output = None
        self.max_coding_errors = max_coding_errors
        self.code_interpreter_calls = 0
    
    def run_code(self, code):
        self.code_interpreter_calls += 1
        new_code = self.accumulated_code + "\n" + code
        code_output, run_success = code_interpreter(new_code)
        if run_success:
            self.accumulated_code = new_code
            self.successful_code_output = code_output
        else:
            self.n_coding_errors += 1
            self.successful_code_output = None
        return code_output
    
    def max_coding_errors_reached(self):
        max_coding_errors_reached = self.n_coding_errors >= self.max_coding_errors
        if max_coding_errors_reached:
            logging.warning(f'Stopping solution generation because {self.n_coding_errors} coding errors were done.')
        return max_coding_errors_reached

### Results

In [None]:
class InferenceResult(BaseModel):
    # text
    prompt: str
    response: Optional[str] = None
    # answers
    boxed_answer: Optional[int] = None
    text_answer: Optional[int] = None
    code_answer: Optional[int] = None
    # output
    output_tokens: int = 0
    reached_max_tokens: bool = False
    # code
    coding_errors: int = 0
    code_interpreter_calls: int = 0

In [None]:
def is_difference_significative(n_first, n_second, n_tries, confidence_level=CFG.confidence_level):
    if n_second == 0:
        if n_first == n_tries:
            return is_difference_significative(n_first, 1, n_tries + 1, confidence_level)
        elif n_first < n_tries:
            return is_difference_significative(n_first, 1, n_tries, confidence_level)
        else:
            raise ValueError()
    p_first = n_first/n_tries
    p_second = n_second/n_tries
    uncertainty = (p_first*(1-p_first)/n_tries + p_second*(1-p_second)/n_tries)**0.5
    z = (p_first - p_second)/uncertainty
    logging.info(f'p_first: {p_first*100:.1f}% p_second: {p_second*100:.1f}% Confidence level for the difference: {2*(norm.cdf(z) - 0.5)*100:.1f}%')
    return z > norm.interval(confidence_level)[1]

is_difference_significative(3, 0, 3)

In [None]:
def log_ground_truth(idx):
    if isinstance(env, MockEnvWithDataframe) and 'ground_truth' in df.columns:
        logging.info(f'Ground truth: {df["ground_truth"].loc[idx]}')

class Results():
    def __init__(self):
        self.results = dict()

    def initialize(self, idx):
        self.results[idx] = []

    def add_result(self, idx, result: InferenceResult):
        self.results[idx].append(result)
    
    def log_results_distribution(self, idx):
        log_ground_truth(idx)
        keys = ['boxed_answer', 'text_answer', 'code_answer']
        for key in keys:
            values = self.get_result_distribution(idx, key)
            logging.info(f'{key} distribution: {values}')

    def get_valid_results(self, idx, result_priority):
        results = []
        for result in self.results[idx]:
            result = result.dict()
            for key in result_priority:
                if result[key] is not None:
                    results.append(result[key])
                    break
        if results:
            return results
        raise NoValidResults(idx)

    def get_most_frequent_result(self, idx, result_priority=CFG.result_priority):
        valid_results = self.get_valid_results(idx, result_priority)
        counter_ret = Counter(valid_results).most_common()
        logging.info(f'Result counts for {idx}: {counter_ret}')
        result, count = get_minimum_most_frequent_value(counter_ret)
        return result, count

    def is_best_solution_found(self, idx, result_priority=CFG.result_priority):
        try:
            valid_results = self.get_valid_results(idx, result_priority)
            counter_ret = Counter(valid_results).most_common()
            logging.info(f'Result counts for {idx}: {counter_ret}')
            if len(counter_ret) == 1:
                return is_difference_significative(counter_ret[0][1], 0, len(valid_results))
            else:
                return is_difference_significative(counter_ret[0][1], counter_ret[1][1], len(valid_results))
        except NoValidResults:
            return False

    def get_result_distribution(self, idx, key):
        results = self.results[idx]
        distribution = np.array([result.dict()[key] for result in results])
        return distribution
    
    def save(self, filepath='results.json'):
        logging.info(f'Saving results in {os.path.realpath(filepath)}')
        results = {idx: [result.dict() for result in results] for idx, results in self.results.items()}
        with open(filepath, 'w') as f:
            json.dump(results, f, indent=4)

    def load(self, filepath):
        logging.info(f'Loading results from {filepath}')
        with open(filepath, 'r') as f:
            results = json.load(f)
        self.results = {int(idx): [InferenceResult(**result) for result in results] for idx, results in results.items()}
            
    def __repr__(self):
        return str(self.results)
    
def get_minimum_most_frequent_value(counter_ret):
    max_count = counter_ret[0][1]
    candidates = []
    for value, count in counter_ret:
        if count == max_count:
            candidates.append(value)
        else:
            break
    return min(candidates), max_count

class NoValidResults(Exception):
    pass

assert get_minimum_most_frequent_value([(2, 1), (3, 1)]) == (2, 1)
assert get_minimum_most_frequent_value([(3, 1), (2, 1)]) == (2, 1)
assert get_minimum_most_frequent_value([(3, 2), (2, 1)]) == (3, 2)

### Inference

In [None]:
def solve_problem_with_code_interpreter(prompt):
    text_generator = TextGeneratorVLLM(cfg=CFG)
    clear_memory()
    code_runner = CodeRunner(CFG.max_coding_errors)
    decoded_output = prompt
    stop_word_cond = True
    generation_mode = 'text'
    while stop_word_cond and text_generator.are_generation_tokens_available():
        if decoded_output.endswith("Problem:") or decoded_output.endswith("User:"):
            break
        is_code_block_finished = not decoded_output.endswith("```python") and generation_mode == 'code'
        if is_code_block_finished:
            code_text = parse_last_python_code_block(decoded_output)
            code_output = code_runner.run_code(code_text)
            if code_runner.max_coding_errors_reached():
                break
            decoded_output = add_code_output_to_prompt(decoded_output, code_output)

        if decoded_output.endswith("```python"):
            decoded_output += '\n'
            generation_mode = 'code'
        else:
            generation_mode = 'text'

        decoded_output = text_generator(decoded_output, mode=generation_mode)
        stop_word_cond = any(decoded_output.endswith(stop_word) for stop_word in CFG.stop_words)

    log_gpu_memory()
    if prompt.endswith("```python"):
        decoded_output = decoded_output.replace(prompt, '```python')
        prompt = prompt[:-len("```python")]
    else:
        decoded_output = decoded_output.replace(prompt, '')
    result = InferenceResult(
        prompt=prompt,
        response=decoded_output,
        output_tokens=text_generator.generated_tokens,
        coding_errors=code_runner.n_coding_errors,
        code_interpreter_calls=code_runner.code_interpreter_calls
    )
    if not text_generator.are_generation_tokens_available():
        # Solution was not achieved, it does not have sense to parse responses
        logging.warning(f'Max number of new tokens {CFG.max_new_tokens} was reached. Solution not found.')
        result.reached_max_tokens = True
    else:
        logging.info(f'Total generated tokens: {text_generator.generated_tokens}')
        if not code_runner.max_coding_errors_reached():
            result.boxed_answer = parse_boxed_answer(decoded_output)
            result.text_answer = parse_response_in_text(decoded_output)
            result.code_answer = parse_response_in_code(code_runner.successful_code_output)
    return result

### Show

In [None]:
def display_decoded_output(idx, text):
    display(Markdown('---'))
    display(Markdown(f'### Problem {idx}'))
    display(Markdown(text.replace('Assistant: ', 'Assistant: \n')))
    display(Markdown('---'))

### Results analysis

In [None]:
def show_inference_insights(results):
    keys = ['coding_errors', 'output_tokens', 'code_interpreter_calls']
    answers = ['boxed_answer', 'text_answer', 'code_answer']
    rows = []
    for idx in results.results:
        logging.info(f'Logging inference insights for problem {idx}')
        row = dict(n_runs=len(results.get_result_distribution(idx, keys[0])))
        for key in keys:
            values = results.get_result_distribution(idx, key)
            logging.info(f'{key} distribution: {values}')
            row[f'mean_{key}'] = round(np.mean(values), 1)
            row[f'median_{key}'] = round(np.median(values), 1)
        values = results.get_result_distribution(idx, 'reached_max_tokens')
        logging.info(f'reached_max_tokens distribution: {values}')
        row['unfinished_responses'] = np.sum(values)
        values = results.get_result_distribution(idx, 'coding_errors')
        row['max_coding_errors_reached'] = np.sum(values >= CFG.max_coding_errors)
        
        for answer in answers:
            values = results.get_result_distribution(idx, answer)
            logging.info(f'{answer} distribution: {values}')
            row[f'{answer}s'] = np.sum(values != None)
        rows.append(row)
        logging.info('')
    insights = pd.DataFrame(rows)
    summary = insights.sum()
    for column in insights.columns:
        if 'mean' in column or 'median' in column:
            summary[column] = round(summary[column] / len(insights), 1)
    insights.loc['all'] = summary
    for column in insights.columns[-5:]:
        insights[column] = (insights[column]/insights['n_runs']*100).round(1)
    return insights

In [None]:
def get_accuracy_report(results, result_priority):
    report = df[['answer', 'ground_truth']].copy()
    report['answer'] = 0
    report['n_runs'] = 0
    report['correct_counts'] = 0
    report['highest_wrong_counts'] = 0
    report['wrong_counts'] = 0
    report['highest_correct_tokens'] = None

    for idx in results.results:
        try:
            report.loc[idx, 'n_runs'] = get_n_runs(idx, results)
            values = np.array(results.get_valid_results(idx, result_priority))
            counter_ret = Counter(values).most_common()
            report.loc[idx, 'answer'] = get_minimum_most_frequent_value(counter_ret)[0]
            ground_truth = df.loc[idx, 'ground_truth']
            for pred, count in counter_ret:
                if pred == ground_truth:
                    report.loc[idx, 'correct_counts'] = count
                    break
            report.loc[idx, 'highest_wrong_counts'] = get_highest_wrong_count(counter_ret, ground_truth)
            report.loc[idx, 'wrong_counts'] = get_wrong_counts(counter_ret, ground_truth)
        except NoValidResults:
            report.loc[idx, 'answer'] = None
        if report.loc[idx, 'correct_counts'] > 0:
            report.loc[idx, 'highest_correct_tokens'] = get_highest_correct_tokens(idx, ground_truth, results)
    report['is_correct'] = (report['answer'] == report['ground_truth']).astype(int)
    report['pass'] = report['correct_counts'] > 0
    report.loc[report['answer'].isna(), 'is_correct'] = np.nan
    return add_summary_to_report(report)

def add_summary_to_report(report):
    summary = report.sum()
    for key in report.columns[:2]:
        summary[key] = '-'
    summary['highest_correct_tokens'] = report['highest_correct_tokens'].max()
    report.loc['summary'] = summary
    return report

def get_highest_wrong_count(counter_ret, ground_truth):
    for pred, count in counter_ret:
        if pred != ground_truth:
            return count
    return 0

def get_wrong_counts(counter_ret, ground_truth):
    wrong_counts = 0
    for pred, count in counter_ret:
        if pred != ground_truth:
            wrong_counts += count
    return wrong_counts


def get_n_runs(idx, results):
    return len(results.results[idx])

def get_highest_correct_tokens(idx, ground_truth, results):
    highest_correct_tokens = 0
    tokens = results.get_result_distribution(idx, 'output_tokens')
    for answer in CFG.result_priority:
        values = results.get_result_distribution(idx, answer)
        correct_answer_tokens = tokens[values == ground_truth]
        if len(correct_answer_tokens) > 0:
            max_tokens = max(correct_answer_tokens)
            highest_correct_tokens = max(highest_correct_tokens, max_tokens)
    return highest_correct_tokens

In [None]:
def analyze_MATH_results(result_priority):
    logging.info(f'Analyzing MATH results for {result_priority} priorities')
    accuracy_report = get_accuracy_report(results, result_priority)
    print_disaggregated_metrics(accuracy_report)
    accuracy_report = accuracy_report.loc[accuracy_report.index[:-1]]
    print_relevant_metrics(accuracy_report)
    for key in ['level', 'type']:
        accuracy_report[key] = df[key]
        plot_grouped_results(accuracy_report, key)


def print_relevant_metrics(accuracy_report):
    correct = accuracy_report['is_correct'].value_counts().get(1, 0)
    unanswered = accuracy_report['is_correct'].isna().sum()
    wrong = accuracy_report['is_correct'].value_counts().get(0, 0)
    total = correct + unanswered + wrong
    accuracy = correct/total
    print('\tAggregated metrics majority vote')
    print(f'Correct: {correct}/{total} ({accuracy:.2f} ± {estimate_uncertainty(accuracy, total):.2f})')
    print(f'Unanswered: {unanswered}/{total} ({unanswered/total:.2f} ± {estimate_uncertainty(unanswered/total, total):.2f})')
    print(f'Wrong: {wrong}/{total} ({wrong/total:.2f} ± {estimate_uncertainty(wrong/total, total):.2f})')
    print('\tAggregated metrics pass')
    correct = accuracy_report['pass'].sum()
    accuracy = correct/total
    print(f'Correct: {correct}/{total} ({accuracy:.2f} ± {estimate_uncertainty(accuracy, total):.2f})')


def estimate_uncertainty(proportion, n):
    return 1.96 * np.sqrt(proportion * (1 - proportion) / n)


def print_disaggregated_metrics(accuracy_report):
    correct = accuracy_report.loc['summary', 'correct_counts']
    wrong = accuracy_report.loc['summary', 'wrong_counts']
    total = accuracy_report.loc['summary', 'n_runs']
    unanswered = total - correct - wrong
    print('\tDisaggregated metrics')
    print(f'Correct: {correct}/{total} ({correct/total:.2f} ± {estimate_uncertainty(correct/total, total):.2f})')
    print(f'Unanswered: {unanswered}/{total} ({unanswered/total:.2f} ± {estimate_uncertainty(unanswered/total, total):.2f})')
    print(f'Wrong: {wrong}/{total} ({wrong/total:.2f} ± {estimate_uncertainty(wrong/total, total):.2f})')


def plot_grouped_results(df, group):
    categories = sorted(df[group].unique().tolist())
    correct = []
    unanswered = []
    wrong = []
    for category in categories:
        correct.append(df[df[group] == category].is_correct.value_counts().get(1, 0))
        unanswered.append(df[df[group] == category].is_correct.isna().sum())
        wrong.append(df[df[group] == category].is_correct.value_counts().get(0, 0))

    correct.append(np.sum(correct))
    unanswered.append(np.sum(unanswered))
    wrong.append(np.sum(wrong))
    categories.append('overall')

    total = np.array(correct) + np.array(unanswered) + np.array(wrong)
    correct = np.array(correct)/total
    unanswered = np.array(unanswered)/total
    wrong = np.array(wrong)/total
    plt.bar(categories, correct, label='Correct', color='tab:green')
    plt.bar(categories, unanswered, bottom=correct, label='Unanswered', color='tab:orange')
    plt.bar(categories, wrong, bottom=np.array(correct)+np.array(unanswered), label='Wrong', color='tab:red')
    for idx, value in enumerate(categories):
        plt.text(value, correct[idx]/2, f'{correct[idx]*100:.0f}%', ha='center', va='center')
        plt.text(value, correct[idx] + unanswered[idx]/2, f'{unanswered[idx]*100:.0f}%', ha='center', va='center')
        plt.text(value, correct[idx] + unanswered[idx] + wrong[idx]/2, f'{wrong[idx]*100:.0f}%', ha='center', va='center')
    #plt.legend(loc=0)
    plt.ylim(0, 1)
    plt.grid(axis='y')
    plt.title(f'Results grouped by {group}')
    plt.show()

## Inference

In [None]:
wait_for_gpu_memory()
create_model_and_inference_artifacts() # TODO: remove this line

In [None]:
results = Results()
for idx, (test, sample_submission) in tqdm(enumerate(iter_test), total=N_PROBLEMS, smoothing=0):
    if is_quick_save_condition(idx, test):
        break
    create_model_and_inference_artifacts()
    results.initialize(idx)
    adjust_repetitions_to_meet_ending_time(idx)
    for repetition_idx in tqdm(range(N_REPETITIONS), desc='Repetitions', smoothing=0):
        if is_ending_time() or results.is_best_solution_found(idx, CFG.result_priority):
            break
        logging.info(f"Problem {idx} - repetition {repetition_idx+1}/{CFG.n_repetitions} - time spent : {get_time_spent():.0f} secs")
        try:
            prompt = get_formatted_prompt(test['problem'].values[0], repetition_idx)
            result = solve_problem_with_code_interpreter(prompt)
            results.add_result(idx, result)
            results.log_results_distribution(idx)
            if CFG.verbose:
                display_decoded_output(idx, test['problem'].values[0] + '\n\n' + result.response)
        except Exception as e:
            logging.warning(f'Exception when trying to generate response for {idx}: {e}')
            raise e # TODO: remove this line
    try:
        result, count = results.get_most_frequent_result(idx, CFG.result_priority)
        sample_submission['answer'] = result % 1000
    except NoValidResults:
        logging.warning(f'No valid results for problem {idx}. Using default answer: {CFG.default_answer}')
        sample_submission['answer'] = CFG.default_answer
    log_ground_truth(idx)
    logging.info(f'Predicted answer for problem {idx} is: {sample_submission["answer"].values[0]}')
    env.predict(sample_submission)
    print('\n'*4)

## Results

In [None]:
if CFG.save_results:
    timestamp = get_timestamp()
    results.save(f'{timestamp}_results.json')

In [None]:
if not CFG.quick_save:
    insights = show_inference_insights(results)
    display(insights)

In [None]:
if not CFG.submission_mode:
    accuracy_report = get_accuracy_report(results, CFG.result_priority)
    accuracy_report.to_csv(f'{timestamp}_accuracy_report.csv', index=True)
    display(accuracy_report)

In [None]:
if not CFG.submission_mode and 'MATH' in CFG.dataset:
    analyze_MATH_results(CFG.result_priority)

In [None]:
logging.info(f'The notebook run in {get_time_spent()/60:.1f} minutes')

In [None]:
empty_gpu_vram()

In [None]:
del server_thread

## TODO:

- [x] Wait until the server is running
- [x] Better handling of stop words
- [ ] Better handling of tokens using completions object instead of tokenizer
- [ ] Remove verbosity from the server
- [ ] Allow to stop the server: https://chatgpt.com/share/7f5d5cc3-db8c-42f0-ae07-f7fa00e493c4
- [ ] Update the inference loop to allow parallelization
- [ ] Use 2 gpus instead of 1
- [ ] Clean code and remove old code
- [ ] Verify that we get similar results but much faster
- [ ] Move the code to Kaggle