TPLAWTON AIMO COMPETITION

Forked from: https://www.kaggle.com/code/suzchin/solution-baseline-deepseekmath-7b and fine-tuned

Deepseek-Math: https://github.com/deepseek-ai/DeepSeek-Math

DATASETS LOADED IN ENVIRONMENT (Some not used):  
AIMO  
deepseek-math  
accelerate  
bitsandbytes  
open-math-mistral  
gemma 7b-it  
gemma 2b-it  

This notebook utilizes the DeepseekMath model to predict answers to complex math problems formatted in Latex. The model generates and executes code to solve these problems, then extracts and processes the numeric answers from the output. Key techniques include self-consistency for best result selection, custom stopping criteria for iterative generation, prompt engineering, and debugging and time logging to ensure reproducibility and efficient runtime.

In [1]:
#Initialize time tracking and debugging variables
import time
NOTEBOOK_START_TIME = time.time()

DEBUG = False

# Use Past keys for iterative text generation in prediction function
USE_PAST_KEY = True

In [None]:
# Imports and Initializations
import torch
import pandas as pd
from tqdm import tqdm
import gc
torch.backends.cuda.enable_mem_efficient_sdp(False)
import re
import sys
import subprocess
import math
import random
from collections import Counter
from numpy.random import choice
import numpy as np

from transformers import (
    AutoModelForCausalLM, 
    AutoTokenizer, 
    AutoConfig,
    StoppingCriteria,
    StoppingCriteriaList,
    set_seed
)
import transformers

set_seed(42)

# Total number of responses generated by model, will select best answer after
n_repetitions = 17
TOTAL_TOKENS = 2048


MODEL_PATH = "/kaggle/input/deepseek-math"
#MODEL_PATH = "/kaggle/input/deepseek-math/"

In [None]:
# Extract numerical substring from strings generated by model
def naive_parse(answer):
    out = []
    start = False
    end = False
    for l in reversed(list(answer)):
        if l in '0123456789' and not end:
            start = True
            out.append(l)
        else:
            if start:
                end = True
        
    out = reversed(out)
    return ''.join(out)

In [None]:
# Extract last line of output
def return_last_print(output, n):
    lines = output.strip().split('\n')
    if lines:
        return lines[n]
    else:
        return ""

# take the code given by the model and process it to return its output
def process_code(code, return_shell_output=False):
    
    def repl(match):
        if "real" not in match.group():
            return "{}{}".format(match.group()[:-1], ', real=True)')
        else:
            return "{}{}".format(match.group()[:-1], ')')
    code = re.sub(r"symbols\([^)]+\)", repl, code)

    if return_shell_output:
        code = code.replace('\n', '\n    ')
            # Add a try...except block
        code = "\ntry:\n    from sympy import *\n{}\nexcept Exception as e:\n    print(e)\n    print('FAIL')\n".format(code)
    
    if not return_shell_output:
        print(code)
    with open('code.py', 'w') as fout:
        fout.write(code)
    
    batcmd = 'timeout 7 ' + sys.executable + ' code.py'
    try:
        shell_output = subprocess.check_output(batcmd, shell=True).decode('utf8')
        return_value = return_last_print(shell_output, -1)
        print(shell_output)
        if return_shell_output:
            if return_value=='FAIL':
                CODE_STATUS = False
                return_value = return_last_print(shell_output, -2)
                if "not defined" in return_value:
                    return_value+='\nTry checking the formatting and imports'
            else:
                CODE_STATUS = True
            return return_value, CODE_STATUS  
        code_output = round(float(eval(return_value))) % 1000
    except Exception as e:
        print(e,'shell_output')
        code_output = -1
    
    if return_shell_output:
        if code_output==-1:
            CODE_STATUS = False
        else:
            CODE_STATUS = True
        return code_output, CODE_STATUS  
    
    return code_output

# process output to return just numeric answer modulo 1000
def process_text_output(output):
    result = output    
    try:
        result_output = re.findall(r'\\boxed\{(\d+)\}', result)

        print('BOXED', result_output)
        if not len(result_output):
            result_output = naive_parse(result)
        else:
            result_output = result_output[-1]

        print('BOXED FINAL', result_output)
        if not len(result_output):
            result_output = -1
        
        else:
            result_output = round(float(eval(result_output))) % 1000
    
    except Exception as e:
        print(e)
        print('ERROR PARSING TEXT')
        result_output = -1
    
    return result_output


In [None]:
#Empty cuda cache 
torch.cuda.empty_cache()
gc.collect()

In [None]:
# Configurate and load Deepseek Math Model and Tokenizers
config = AutoConfig.from_pretrained(MODEL_PATH)
config.gradient_checkpointing = True

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

device_map = [('model.embed_tokens', 0),
             ('model.layers.0', 0),
             ('model.layers.1', 0),
             ('model.layers.2', 0),
             ('model.layers.3', 0),
             ('model.layers.4', 0),
             ('model.layers.5', 0),
             ('model.layers.6', 0),
             ('model.layers.7', 0),
             ('model.layers.8', 0),
             ('model.layers.9', 0),
             ('model.layers.10', 0),
             ('model.layers.11', 0),
             ('model.layers.12', 0),
             ('model.layers.13', 0),
             ('model.layers.14', 0),
             ('model.layers.15', 0),
             ('model.layers.16', 1),
             ('model.layers.17', 1),
             ('model.layers.18', 1),
             ('model.layers.19', 1),
             ('model.layers.20', 1),
             ('model.layers.21', 1),
             ('model.layers.22', 1),
             ('model.layers.23', 1),
             ('model.layers.24', 1),
             ('model.layers.25', 1),
             ('model.layers.26', 1),
             ('model.layers.27', 1),
             ('model.layers.28', 1),
             ('model.layers.29', 1),
             ('model.norm', 1),
             ('lm_head', 1)]

device_map = {ii:jj for (ii,jj) in device_map}
 
model = AutoModelForCausalLM.from_pretrained(
    MODEL_PATH,
    device_map=device_map,
    torch_dtype="auto",
    trust_remote_code=True,
    config=config
)

# Helper function for custom stopping criteria in the text generation
class StoppingCriteriaSub(StoppingCriteria):
    def __init__(self, stops = [], encounters=1):
        super().__init__()
        self.stops = [stop.to("cuda") for stop in stops]

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor):
        for stop in self.stops:
            last_token = input_ids[0][-len(stop):]
            if torch.all(torch.eq(stop,last_token)):
                return True
        return False


# Custom Stop words for iterative generation
stop_words = ["```output", "```python", "```\nOutput" , ")\n```" , "``````output"]   
stop_words_ids = [tokenizer(stop_word, return_tensors='pt', add_special_tokens=False)['input_ids'].squeeze() for stop_word in stop_words]
stopping_criteria = StoppingCriteriaList([StoppingCriteriaSub(stops=stop_words_ids)])

model.dtype, model.hf_device_map

In [None]:
# PROMPT ENGINEERING: 1 OF 2 SEPARATE PROMPTS WILL BE CHOSEN FOR EACH REPITION 
code = """Below is a math problem you are to solve (positive numerical answer):
\"{}\"
To accomplish this, first determine a sympy-based approach for solving the problem by 
listing each step to take and what functions need to be called in each step. Be clear 
so even an idiot can follow your instructions, and remember, your final answer should 
be positive integer, not an algebraic expression!
Write the entire script covering all the steps (use comments and document it well) and 
\print the result. After solving the problem, output the final numerical answer within \\boxed{}.

Approach:"""


cot = """Below is a math problem you are to solve (positive numerical answer!):
\"{}\"
Analyze this problem and think step by step to come to a solution with programs. 
After solving the problem, output the final numerical answer within \\boxed{}.\n\n"""

promplt_options = [code,cot]

In [None]:
# seed for reproducability
def seed_everything(seed):
    import random, os
    import numpy as np
    import torch
    
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
    set_seed(seed)
    
seed_everything(42)

In [None]:
#Initializew Model parameters
temperature = 0.89645
top_p = 1.0
temperature_coding = 0.89645
top_p_coding = 1.0

# Initialize lists for prediction function
total_results = {}
total_answers = {}
best_stats = {}
total_outputs = {}
question_type_counts = {}
starting_counts = (2,3)

In [None]:
# PREDICTION FUNCTION

extra_time = 0
allowed = 0
def predict(problem):
    seed_everything(69)
    
    #Initialize parameters and lists for prediction function
    temperature = 0.9
    top_p = 1.0
    temperature_coding = 0.9
    top_p_coding = 1.0

    total_results = {}
    total_answers = {}
    best_stats = {}
    total_outputs = {}
    question_type_counts = {}
    starting_counts = (2,3)
    i = 0
    
    global n_repetitions,TOTAL_TOKENS,model,tokenizer,USE_PAST_KEY,NOTEBOOK_START_TIME,promplt_options,code,cot,extra_time,allowed
    
    # If notebook running too long, return 0 to avoid running indefinitely
    if time.time()-NOTEBOOK_START_TIME>=32200:
        return 0
    
    #Start time
    PROBLEM_START_TIME = time.time()
    allowed = min(extra_time,600)
    
    #MAIN LOOP 
    for jj in tqdm(range(n_repetitions)):
        
        # Early Exit Conditions (found best, over allowed runtime)
        best, best_count = best_stats.get(i,(-1,-1))
        if best_count>np.sqrt(jj):
            print("SKIPPING CAUSE ALREADY FOUND BEST")
            continue
        
        
        if time.time()-PROBLEM_START_TIME>=720+allowed:
            
            extra = (time.time()-PROBLEM_START_TIME)-720 
            extra_time-=max(0,extra)
            
            return best_stats[0][0]
        
        if time.time()-NOTEBOOK_START_TIME>=32200:
            return best_stats[0][0]

        outputs = total_outputs.get(i,[])
        text_answers, code_answers = question_type_counts.get(i,starting_counts)
        results = total_results.get(i,[])
        answers = total_answers.get(i,[])  
        
        for _ in range(5):
            torch.cuda.empty_cache()
            gc.collect()
            time.sleep(0.2)
        
        try:
            #initialize variables
            ALREADY_GEN = 0
            code_error = None
            code_error_count = 0
            code_output = -1
            counts = np.array([text_answers,code_answers])
            
            # draw either the code or cot prompt with p = counts/counts.sum
            draw = choice(promplt_options, 1,
                          p=counts/counts.sum())
            
            #Create prompt
            initail_message = draw[0].format(problem,"{}")            
            prompt = f"User: {initail_message}"

            current_printed = len(prompt)
            print(f"{jj}_{prompt}\n")

            model_inputs = tokenizer(prompt, return_tensors='pt').to(model.device)
            input_len = len(model_inputs['input_ids'][0])

            #Generate first section of code, storing the past_values for iterative generation
            generation_output = model.generate(**model_inputs, 
                                               max_new_tokens=TOTAL_TOKENS-ALREADY_GEN,
                                               return_dict_in_generate=USE_PAST_KEY,
                                               do_sample = True,
                                               temperature = temperature,
                                               top_p = top_p,
                                               num_return_sequences=1, stopping_criteria = stopping_criteria)
            
            if USE_PAST_KEY:
                output_ids = generation_output.sequences[0]
            else:
                output_ids = generation_output[0]
                
            # decode output and initialize cummulative code variable
            decoded_output = tokenizer.decode(output_ids, skip_special_tokens=True)
            print(f"{decoded_output[current_printed:]}\n")
            current_printed += len(decoded_output[current_printed:])
            cummulative_code = "" 
            
            stop_word_cond = False
            for stop_word in stop_words:
                stop_word_cond = stop_word_cond or (decoded_output[-len(stop_word):]==stop_word)
                
            # Continue generation until stopping criteria is met or gone through all tokens
            while (stop_word_cond) and (ALREADY_GEN<(TOTAL_TOKENS)):
                
                # if code generates output, include its output in prompt, if not continue generating code
                if (decoded_output[-len("```python"):]=="```python"):
                    temperature_inner=temperature_coding
                    top_p_inner = top_p_coding
                    prompt = decoded_output
                else:
                    temperature_inner=temperature
                    top_p_inner = top_p
                    try:
                        if (decoded_output[-len("``````output"):]=="``````output"):
                            code_text = decoded_output.split('```python')[-1].split("``````")[0]
                        else:
                            code_text = decoded_output.split('```python')[-1].split("```")[0]
                        
                        #Add generated code to cummulative code and retrieve the output with process_code
                        cummulative_code+=code_text
                        code_output, CODE_STATUS = process_code(cummulative_code, return_shell_output=True)
                        print('CODE RESULTS', code_output)
                        
                
                        if code_error==code_output:
                            code_error_count+=1
                        else:
                            code_error=code_output
                            code_error_count = 0

                        if not CODE_STATUS:
                            cummulative_code = cummulative_code[:-len(code_text)]

                            if code_error_count>=1:
                                print("REPEATED ERRORS")
                                break

                    except Exception as e:
                        print(e)
                        print('ERROR PARSING CODE')
                        code_output = -1
                    
                    # Include output in prompt
                    if code_output!=-1:
                        if (decoded_output[-len(")\n```"):]==")\n```"):
                            prompt = decoded_output+'```output\n'+str(code_output)+'\n```\n'
                        else:
                            prompt = decoded_output+'\n'+str(code_output)+'\n```\n'
                    else:
                        prompt = decoded_output
                        cummulative_code=""
                        
                model_inputs = tokenizer(prompt, return_tensors='pt').to(model.device)
                ALREADY_GEN =  len(model_inputs['input_ids'][0])-input_len

                # Continue generating with past key to prevent extra computation
                if USE_PAST_KEY:
                    old_values = generation_output.past_key_values
                else:
                    old_values = None
                
                # Continue generation with old values stored from the first generation and with new prompt
                generation_output = model.generate(**model_inputs, 
                                                   max_new_tokens=TOTAL_TOKENS-ALREADY_GEN, 
                                                   return_dict_in_generate=USE_PAST_KEY,
                                                   past_key_values=old_values,
                                                   do_sample = True,
                                                   temperature = temperature_inner,
                                                   top_p = top_p_inner,
                                                   num_return_sequences=1, stopping_criteria = stopping_criteria)
                if USE_PAST_KEY:
                    output_ids = generation_output.sequences[0]
                else:
                    output_ids = generation_output[0]
                    
                decoded_output = tokenizer.decode(output_ids, skip_special_tokens=True)
                print(f"\nINTERMEDIATE OUT :\n{decoded_output[current_printed:]}\n")
                current_printed+=len(decoded_output[current_printed:])
                
                # If done generating, then set stop_word_cond to false
                stop_word_cond = False
                for stop_word in stop_words:
                    stop_word_cond = stop_word_cond or (decoded_output[-len(stop_word):]==stop_word)
                    
            if USE_PAST_KEY:
                output_ids = generation_output.sequences[0]
            else:
                output_ids = generation_output[0]

            # Get raw output from tokenizer (ignoring prompt) and then process output to obtain numerical result % 1000
            raw_output = tokenizer.decode(output_ids[input_len:], skip_special_tokens=True)
            #print(f"\n\nOutput :\n{raw_output}\n")                            
            result_output = process_text_output(raw_output)
            
            try:
                # Extra modulo 1000 for consistency
                code_output = round(float(eval(code_output))) % 1000
            except Exception as e:
                print(e,'final_eval')
                code_output = -1
        except Exception as e:
            print(e,"5")
            result_output, code_output = -1, -1

        # add code_output to outputs
        if code_output!=-1:
            outputs.append(code_output)
            code_answers+=1

        # add result_output to outputs
        if result_output!=-1:
            outputs.append(result_output)
            text_answers+=1

        # Best answer selection,take the most common answer and place in best 
        if len(outputs) > 0:
            occurances = Counter(outputs).most_common()
            print(occurances)
            if occurances[0][1] > best_count:
                print("GOOD ANSWER UPDATED!")
                best = occurances[0][0]
                best_count = occurances[0][1]
            if occurances[0][1] > 5:
                print("ANSWER FOUND!")
                break

        results.append(result_output)
        answers.append(code_output)
        
        best_stats[i] = (best, best_count) 
        question_type_counts[i] = (text_answers, code_answers)
        total_outputs[i] = outputs
        
        total_results[i] = results
        total_answers[i] = answers

        print("code_answers",code_answers-starting_counts[1],"text_answers",text_answers-starting_counts[0])
   
    # After all repitions, return the most common answer
    remaining = 720-(time.time()-PROBLEM_START_TIME)
    remaining = max(0,remaining)
    extra_time+=remaining
    return best_stats[0][0]

# Submission with the new API

In [None]:
import aimo

env = aimo.make_env()
iter_test = env.iter_test()

In [None]:
for test, sample_submission in iter_test:
    sample_submission['answer'] = predict(test['problem'].values[0])
    env.predict(sample_submission)
    print(test)
    print(sample_submission)

In [None]:
with open('code.py', 'w') as fout:
    fout.write("print('done')")

batcmd = 'timeout 7 ' + sys.executable + ' code.py'
try:
    shell_output = subprocess.check_output(batcmd, shell=True).decode('utf8')
    print(shell_output)
except:
    pass