# Credits:

- https://www.kaggle.com/code/olyatsimboy/aimo-openmath-mistral-baseline
- https://www.kaggle.com/code/aatiffraz/prompt-prediction-w-mixtral-mistral7b-gemma-llama
- https://www.kaggle.com/code/thedrcat/aimo-mixtral-baseline
- https://www.kaggle.com/code/awsaf49/aimo-kerasnlp-starter


In [None]:
%%writefile /tmp/requirements.txt

-U ../input/bitsandbytes-0-42-0-py3-none-any-whl/bitsandbytes-0.42.0-py3-none-any.whl


In [None]:
! pip install -r /tmp/requirements.txt -qq
print('done')

In [None]:
from aimo_log import logger
import postprocess
import random
from contextlib import contextmanager
from transformers import AutoTokenizer, GenerationConfig, AutoModelForCausalLM



In [None]:
deepseek_model_name = "deepseek-ai/deepseek-math-7b-instruct"
deepseek_tokenizer = AutoTokenizer.from_pretrained(deepseek_model_name)
mistral_model_name = "../input/mistral" # "/Mistral-7B-Instruct-v0.2"
mistral_tokenizer = AutoTokenizer.from_pretrained(mistral_model_name)

In [None]:
deepseek_generation_config = GenerationConfig.from_pretrained(deepseek_model_name)
mistral_generation_config = GenerationConfig.from_pretrained(mistral_model_name)

# `GenerationConfig` documentation



https://huggingface.co/docs/transformers/main/en/main_classes/text_generation#transformers.GenerationConfig 

Class that holds a configuration for a generation task. A `generate` call supports the following generation methods
for text-decoder, text-to-text, speech-to-text, and vision-to-text models:

- *greedy decoding* if `num_beams=1` and `do_sample=False`

- *contrastive search* if `penalty_alpha>0.` and `top_k>1`

- *multinomial sampling* if `num_beams=1` and `do_sample=True`

- *beam-search decoding* if `num_beams>1` and `do_sample=False`

- *beam-search multinomial sampling* if `num_beams>1` and `do_sample=True`

- *diverse beam-search decoding* if `num_beams>1` and `num_beam_groups>1`

- *constrained beam-search decoding* if `constraints!=None` or `force_words_ids!=None`

- *assisted decoding* if `assistant_model` or `prompt_lookup_num_tokens` is passed to `.generate()`

To learn more about decoding strategies refer to the [text generation strategies guide](../generation_strategies).


> A large number of these flags control the logits or the stopping criteria of the generation. Make sure you check
> the [generate-related classes](https://huggingface.co/docs/transformers/internal/generation_utils) for a full
> description of the possible manipulations, as well as examples of their usage.



## Arg:



### Parameters that control the length of the output

- `max_length` (`int`, *optional*, defaults to 20):
    The maximum length the generated tokens can have. Corresponds to the length of the input prompt +
    `max_new_tokens`. Its effect is overridden by `max_new_tokens`, if also set.
- `max_new_tokens` (`int`, *optional*):
    The maximum numbers of tokens to generate, ignoring the number of tokens in the prompt.
- `min_length` (`int`, *optional*, defaults to 0):
    The minimum length of the sequence to be generated. Corresponds to the length of the input prompt +
    `min_new_tokens`. Its effect is overridden by `min_new_tokens`, if also set.
- `min_new_tokens` (`int`, *optional*):
    The minimum numbers of tokens to generate, ignoring the number of tokens in the prompt.
- `early_stopping` (`bool` or `str`, *optional*, defaults to `False`):
    Controls the stopping condition for beam-based methods, like beam-search. It accepts the following values:
    `True`, where the generation stops as soon as there are `num_beams` complete candidates; `False`, where an
    heuristic is applied and the generation stops when is it very unlikely to find better candidates;
    `"never"`, where the beam search procedure only stops when there cannot be better candidates (canonical
    beam search algorithm).
- `max_time` (`float`, *optional*):
    The maximum amount of time you allow the computation to run for in seconds. generation will still finish
    the current pass after allocated time has been passed.
- `stop_strings` (`str or List[str]`, *optional*):
    A string or a list of strings that should terminate generation if the model outputs them.



###  Parameters that control the generation strategy used

- do_sample (`bool`, *optional*, defaults to `False`):
    Whether or not to use sampling ; use greedy decoding otherwise.
- num_beams (`int`, *optional*, defaults to 1):
    Number of beams for beam search. 1 means no beam search.
- num_beam_groups (`int`, *optional*, defaults to 1):
    Number of groups to divide `num_beams` into in order to ensure diversity among different groups of beams.
    [this paper](https://arxiv.org/pdf/1610.02424.pdf) for more details.
- penalty_alpha (`float`, *optional*):
    The values balance the model confidence and the degeneration penalty in contrastive search decoding.
- use_cache (`bool`, *optional*, defaults to `True`):
    Whether or not the model should use the past last key/values attentions (if applicable to the model) to
    speed up decoding.



### Parameters for manipulation of the model output logits

- temperature (`float`, *optional*, defaults to 1.0):
    The value used to modulate the next token probabilities.
- top_k (`int`, *optional*, defaults to 50):
    The number of highest probability vocabulary tokens to keep for top-k-filtering.
- top_p (`float`, *optional*, defaults to 1.0):
    If set to float < 1, only the smallest set of most probable tokens with probabilities that add up to
    `top_p` or higher are kept for generation.
- min_p (`float`, *optional*):
    Minimum token probability, which will be scaled by the probability of the most likely token. It must be a
    value between 0 and 1. Typical values are in the 0.01-0.2 range, comparably selective as setting `top_p` in
    the 0.99-0.8 range (use the opposite of normal `top_p` values).
- typical_p (`float`, *optional*, defaults to 1.0):
    Local typicality measures how similar the conditional probability of predicting a target token next is to
    the expected conditional probability of predicting a random token next, given the partial text already
    generated. If set to float < 1, the smallest set of the most locally typical tokens with probabilities that
    add up to `typical_p` or higher are kept for generation. See [this
    paper](https://arxiv.org/pdf/2202.00666.pdf) for more details.
- epsilon_cutoff (`float`, *optional*, defaults to 0.0):
    If set to float strictly between 0 and 1, only tokens with a conditional probability greater than
    `epsilon_cutoff` will be sampled. In the paper, suggested values range from 3e-4 to 9e-4, depending on the
    size of the model. See [Truncation Sampling as Language Model
    Desmoothing](https://arxiv.org/abs/2210.15191) for more details.
- eta_cutoff (`float`, *optional*, defaults to 0.0):
    Eta sampling is a hybrid of locally typical sampling and epsilon sampling. If set to float strictly between
    0 and 1, a token is only considered if it is greater than either `eta_cutoff` or `sqrt(eta_cutoff) *
    exp(-entropy(softmax(next_token_logits)))`. The latter term is intuitively the expected next token
    probability, scaled by `sqrt(eta_cutoff)`. In the paper, suggested values range from 3e-4 to 2e-3,
    depending on the size of the model. See [Truncation Sampling as Language Model
    Desmoothing](https://arxiv.org/abs/2210.15191) for more details.
- diversity_penalty (`float`, *optional*, defaults to 0.0):
    This value is subtracted from a beam's score if it generates a token same as any beam from other group at a
    particular time. Note that `diversity_penalty` is only effective if `group beam search` is enabled.
- repetition_penalty (`float`, *optional*, defaults to 1.0):
    The parameter for repetition penalty. 1.0 means no penalty. See [this
    paper](https://arxiv.org/pdf/1909.05858.pdf) for more details.
- encoder_repetition_penalty (`float`, *optional*, defaults to 1.0):
    The paramater for encoder_repetition_penalty. An exponential penalty on sequences that are not in the
    original input. 1.0 means no penalty.
- length_penalty (`float`, *optional*, defaults to 1.0):
    Exponential penalty to the length that is used with beam-based generation. It is applied as an exponent to
    the sequence length, which in turn is used to divide the score of the sequence. Since the score is the log
    likelihood of the sequence (i.e. negative), `length_penalty` > 0.0 promotes longer sequences, while
    `length_penalty` < 0.0 encourages shorter sequences.
- no_repeat_ngram_size (`int`, *optional*, defaults to 0):
    If set to int > 0, all ngrams of that size can only occur once.
- bad_words_ids(`List[List[int]]`, *optional*):
    List of list of token ids that are not allowed to be generated. Check
    [`~generation.NoBadWordsLogitsProcessor`] for further documentation and examples.
- force_words_ids(`List[List[int]]` or `List[List[List[int]]]`, *optional*):
    List of token ids that must be generated. If given a `List[List[int]]`, this is treated as a simple list of
    words that must be included, the opposite to `bad_words_ids`. If given `List[List[List[int]]]`, this
    triggers a [disjunctive constraint](https://github.com/huggingface/transformers/issues/14081), where one
    can allow different forms of each word.
- renormalize_logits (`bool`, *optional*, defaults to `False`):
    Whether to renormalize the logits after applying all the logits processors or warpers (including the custom
    ones). It's highly recommended to set this flag to `True` as the search algorithms suppose the score logits
    are normalized but some logit processors or warpers break the normalization.
- constraints (`List[Constraint]`, *optional*):
    Custom constraints that can be added to the generation to ensure that the output will contain the use of
    certain tokens as defined by `Constraint` objects, in the most sensible way possible.
- forced_bos_token_id (`int`, *optional*, defaults to `model.config.forced_bos_token_id`):
    The id of the token to force as the first generated token after the `decoder_start_token_id`. Useful for
    multilingual models like [mBART](../model_doc/mbart) where the first generated token needs to be the target
    language token.
- forced_eos_token_id (`Union[int, List[int]]`, *optional*, defaults to `model.config.forced_eos_token_id`):
    The id of the token to force as the last generated token when `max_length` is reached. Optionally, use a
    list to set multiple *end-of-sequence* tokens.
- remove_invalid_values (`bool`, *optional*, defaults to `model.config.remove_invalid_values`):
    Whether to remove possible *nan* and *inf* outputs of the model to prevent the generation method to crash.
    Note that using `remove_invalid_values` can slow down generation.
- exponential_decay_length_penalty (`tuple(int, float)`, *optional*):
    This Tuple adds an exponentially increasing length penalty, after a certain amount of tokens have been
    generated. The tuple shall consist of: `(start_index, decay_factor)` where `start_index` indicates where
    penalty starts and `decay_factor` represents the factor of exponential decay
- suppress_tokens  (`List[int]`, *optional*):
    A list of tokens that will be suppressed at generation. The `SupressTokens` logit processor will set their
    log probs to `-inf` so that they are not sampled.
- begin_suppress_tokens  (`List[int]`, *optional*):
    A list of tokens that will be suppressed at the beginning of the generation. The `SupressBeginTokens` logit
    processor will set their log probs to `-inf` so that they are not sampled.
- forced_decoder_ids (`List[List[int]]`, *optional*):
    A list of pairs of integers which indicates a mapping from generation indices to token indices that will be
    forced before sampling. For example, `[[1, 123]]` means the second generated token will always be a token
    of index 123.
- sequence_bias (`Dict[Tuple[int], float]`, *optional*)):
    Dictionary that maps a sequence of tokens to its bias term. Positive biases increase the odds of the
    sequence being selected, while negative biases do the opposite. Check
    [`~generation.SequenceBiasLogitsProcessor`] for further documentation and examples.
- token_healing (`bool`, *optional*, defaults to `False`):
    Heal tail tokens of prompts by replacing them with their appropriate extensions.
    This enhances the quality of completions for prompts affected by greedy tokenization bias.
- guidance_scale (`float`, *optional*):
    The guidance scale for classifier free guidance (CFG). CFG is enabled by setting `guidance_scale > 1`.
    Higher guidance scale encourages the model to generate samples that are more closely linked to the input
    prompt, usually at the expense of poorer quality.
- low_memory (`bool`, *optional*):
    Switch to sequential beam search and sequential topk for contrastive search to reduce peak memory.
    Used with beam search and contrastive search.
- watermarking_config (Union[`WatermarkingConfig`, `dict`], *optional*):
    Arguments used to watermark the model outputs by adding a small bias to randomly selected set of "green" tokens.
    If passed as `Dict`, it will be converted to a `WatermarkingConfig` internally.
    See [this paper](https://arxiv.org/abs/2306.04634) for more details. Accepts the following keys:
    - greenlist_ratio (`float`):
        Used for watermarking. The ratio of "green" tokens used to the vocabulary size. Defaults to 0.25.
    - bias (`float`):
        Used with watermarking. The bias added to the selected "green" tokens' logits. Defaults to 2.0.
    - hashing_key (`int`):
        Hahsing key used for watermarking. Defaults to 15485863 (the millionth prime).
    - seeding_scheme (`str`):
        Algorithm to use for watermarking. Accepts values:
            - "lefthash" (default): "green" tokens selection depend on the last token (Algorithm 2 from the paper)
            - "selfhash": "green" tokens selection depends on the current token itself (Algorithm 3 from the paper)
                The downside of this scheme is that it considers all possible next tokens and can be slower than "lefthash".
    - context_width(`int`):
        The context length of previous tokens to use in seeding. Higher context length makes watermarking more robust.

### Parameters that define the output variables of generate

- num_return_sequences(`int`, *optional*, defaults to 1):
    The number of independently computed returned sequences for each element in the batch.
- output_attentions (`bool`, *optional*, defaults to `False`):
    Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
    tensors for more details.
- output_hidden_states (`bool`, *optional*, defaults to `False`):
    Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
    more details.
- output_scores (`bool`, *optional*, defaults to `False`):
    Whether or not to return the prediction scores. See `scores` under returned tensors for more details.
- output_logits (`bool`, *optional*):
    Whether or not to return the unprocessed prediction logit scores. See `logits` under returned tensors for
    more details.
- return_dict_in_generate (`bool`, *optional*, defaults to `False`):
    Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.

### Special tokens that can be used at generation time

- pad_token_id (`int`, *optional*):
    The id of the *padding* token.
- bos_token_id (`int`, *optional*):
    The id of the *beginning-of-sequence* token.
- eos_token_id (`Union[int, List[int]]`, *optional*):
  The id of the *end-of-sequence* token. Optionally, use a list to set multiple *end-of-sequence* tokens.

### Generation parameters exclusive to encoder-decoder models

- encoder_no_repeat_ngram_size (`int`, *optional*, defaults to 0):
  If set to int > 0, all ngrams of that size that occur in the `encoder_input_ids` cannot occur in the
  `decoder_input_ids`.
- decoder_start_token_id (`Union[int, List[int]]`, *optional*):
    If an encoder-decoder model starts decoding with a different token than *bos*, the id of that token or a list of length
    `batch_size`. Indicating a list enables different start ids for each element in the batch
    (e.g. multilingual models with different target languages in one batch)

### Generation parameters exclusive to assistant generation
- num_assistant_tokens (`int`, *optional*, defaults to 5):
    Defines the number of _speculative tokens_ that shall be generated by the assistant model before being
    checked by the target model at each iteration. Higher values for `num_assistant_tokens` make the generation
    more _speculative_ : If the assistant model is performant larger speed-ups can be reached, if the assistant
    model requires lots of corrections, lower speed-ups are reached.
- num_assistant_tokens_schedule (`str`, *optional*, defaults to `"heuristic"`):
    Defines the schedule at which max assistant tokens shall be changed during inference.
    - `"heuristic"`: When all speculative tokens are correct, increase `num_assistant_tokens` by 2 else
      reduce by 1. `num_assistant_tokens` value is persistent over multiple generation calls with the same assistant model.
    - `"heuristic_transient"`: Same as `"heuristic"` but `num_assistant_tokens` is reset to its initial value after each generation call.
    - `"constant"`: `num_assistant_tokens` stays unchanged during generation
- prompt_lookup_num_tokens (`int`, *optional*, default to `None`):
    The number of tokens to be output as candidate tokens.
- max_matching_ngram_size (`int`, *optional*, default to `None`):
    The maximum ngram size to be considered for matching in the prompt. Default to 2 if not provided.
###  Parameters specific to the caching mechanism:
- cache_implementation (`str`, *optional*, default to `None`):
    Cache class that should be used when generating.
- cache_config (`Union[CacheConfig, dict]`, *optional*, default to `None`):
    Arguments used in the key-value cache class can be passed in `cache_config`. Can be passed as a `Dict` and
    it will be converted to its repsective `CacheConfig` internally.
    Otherwise can be passed as a `CacheConfig` class matching the indicated `cache_implementation`.
### Wild card
- generation_kwargs:
    Additional generation kwargs will be forwarded to the `generate` function of the model. Kwargs that are not
    present in `generate`'s signature will be used in the model forward pass.

# Code

In [None]:
GenerationConfig(
    max_new_tokens=2048,
    temperature=1.5,
    do_sample=True,
    num_return_sequences=10,
    renormalize_logits=True,
    # top_k=50,
    # top_p=0.95,
        
)

In [None]:
# vars(mistral_generation_config)

In [None]:
# vars(deepseek_generation_config)

In [None]:
class Prompt:
    def __init__(self, tokenizer, prompt_format, examples=None, tokenize=False, add_examples=0):
        self.tokenizer = tokenizer
        self.prompt_format = prompt_format
        self.examples = examples
        self.tokenize = tokenize
        self.add_examples = add_examples
        
    def __call__(self, problem_text):
        examples = ""
        if self.add_examples > 0  and self.examples is not None:
            for example in random.choices(self.examples, k=self.add_examples):
                question = example['problem']
                solution = random.choice(example['solutions'])
                examples += f"Question: {question}\nAnswer: {solution}\n"

        prompt = self.prompt_format.format(
            examples=examples,
            problem=problem_text
        )
        return self.tokenizer.apply_chat_template([
                {
                'role': 'user',
                'content': prompt
                }
            ],
            tokenize=self.tokenize,
            return_tensors='pt',
            add_generation_prompt=True
        )


In [None]:
@contextmanager
def tokenized_prompt(prompt):
    try:
        tokenize = prompt.tokenize
        prompt.tokenize = True 
        yield prompt
    finally:
        prompt.tokenize = tokenize

class MathSolver:

    def __init__(self, llm, prompt):
        self.llm = llm
        self.prompt = prompt

    def __call__(self, problem_text):

        with tokenized_prompt(self.prompt) as prompt_maker:
            prompt = prompt_maker(problem_text)

        response = self.llm.generate(prompt.to(self.llm.device))[0]['generated_text']

        return postprocess.get_answer(response)
        


In [None]:
deepseek_noinstruct_prompt = Prompt(
    tokenizer=deepseek_tokenizer, 
    prompt_format=(
        "{examples}{problem}\n" 
        "Please reason step by step, and put your final answer within \\boxed{{}}."
    )
)

deepseek_instruct_prompt = Prompt(
    tokenizer=deepseek_tokenizer, 
    prompt_format=(
        "{examples}{problem}\n" 
        "Please integrate natural language reasoning with programs to solve the problem above, " 
        "and put your final answer within \\boxed{{}}."
    )
)

mistral_template = """You are great at solving math problems and Python!
Please solve the following problem using Python code and step by step natural language reasoning.
Put the final answer within \\boxed{{}}.

{problem}
"""

mistral_prompt = Prompt(
    tokenizer=mistral_tokenizer, 
    prompt_format=mistral_template 
)

In [None]:
print(deepseek_instruct_prompt("What is the sum of 2 and 2?"))
print(deepseek_noinstruct_prompt("What is the sum of 2 and 2?"))
print(mistral_prompt("What is the sum of 2 and 2?"))


In [None]:
for prompt in [deepseek_instruct_prompt, deepseek_noinstruct_prompt, mistral_prompt]:
    with tokenized_prompt(prompt) as prompt_maker:
        print(prompt_maker("What is the sum of 2 and 2?"))


In [None]:
import os
submission_mode = bool(os.getenv('KAGGLE_IS_COMPETITION_RERUN'))
logger.info("submission_mode=%s", submission_mode)

In [None]:
if submission_mode:
    import aimo
else:
    import aimo_fake as aimo
    logger.info("Running in fake mode")

env = aimo.make_env()
problems = env.iter_test()
submit_answer = env.predict

In [None]:
# import importlib
# importlib.reload(aimo)

In [None]:
AutoModelForCausalLM.from_pretrained?

In [None]:
deepseek_model = AutoModelForCausalLM.from_pretrained(
    deepseek_model_name,
)
math_solver = MathSolver(llm=deepseek_model, prompt=deepseek_instruct_prompt)

In [None]:
for test, submission in problems:
    logger.info("Q: %s", test["problem"].values[0])
    answer= math_solver(test['problem'])
    submission['answer'] = answer
    submit_answer(submission)
    logger.info("submission:\n---\n%s\n---", submission)
    break

In [None]:
options = {
    "model_name": "/kaggle/input/deepseek-math",
  
}

temperature = 0.85
do_test = True

In [None]:
import torch
from transformers import (
    AutoModelForCausalLM, 
    AutoTokenizer, 
    BitsAndBytesConfig, 
    AutoConfig,
    set_seed,
    pipeline as transformer_pipeline,
    __version__ as transformers_version
)


In [None]:
print(f"Transformers Version: {transformers_version}")

In [None]:
MODEL_PATH = "/kaggle/input/deepseek-math"

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

In [None]:
print(tokenizer.chat_template)

In [None]:
set_seed(42)

MODEL_PATH = "/kaggle/input/deepseek-math"

quantization_config = BitsAndBytesConfig(
    load_in_4bit = True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=True,
)

config = AutoConfig.from_pretrained(MODEL_PATH)
config.gradient_checkpointing = True


tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

base_LLM = AutoModelForCausalLM.from_pretrained(
    MODEL_PATH,
    device_map="auto",
    torch_dtype="auto",
    trust_remote_code=True,
#     quantization_config=quantization_config,
    config=config
)



In [None]:
pipeline = transformer_pipeline(
    "text-generation",
    model=base_LLM,
    tokenizer=tokenizer,
    torch_dtype='auto',
    device_map="auto",
)

pipeline = functools.partial(
    pipeline,
    max_new_tokens=2048, 
    do_sample=True, 
    temperature=temperature,
    return_full_text=False
)

class ExtPipeline:
    def __init__(self, pipeline, tokenizer, message_template):
        self.pipeline = pipeline
        self.tokenizer = tokenizer
        self.msg_template = message_template
    
    
    def prompt(self, question):
        return self.tokenizer.apply_chat_template(
            [
                {
                    'role': 'user',
                    'content': self.msg_template.format(problem=question)
                }
            ],
            tokenize = False
        )
        
        
    def __call__(self, question):
        
        prompt = self.prompt(question)
        logger.info('prompt:\n---\n%s\n---', prompt)
        
        result = self.pipeline(prompt)[0]['generated_text']
        
        torch.cuda.empty_cache()
        gc.collect()

        return result


In [None]:
pipe = ExtPipeline(pipeline, tokenizer, deepseek_template)
raw_output = pipe(random_equation_question()[0])

In [None]:
#raw_output = pipe(random_arithmetic_question()[0])
print(raw_output)

In [None]:
code_chunk(raw_output), get_answer(raw_output)

In [None]:
chunks = re.split(r"(?=[\n\s]*?)```+?[a-zA-Z\s]*?(?=\n)", raw_output)
print(chunks)

In [None]:
base_LLM.dtype

In [None]:
device = 'cuda'

In [None]:
torch.backends.cuda.enable_mem_efficient_sdp(False)

In [None]:
keras_team_template = """Role:
You are an advanced AI system with exceptional mathematical reasoning and problem-solving capabilities, specifically designed to solve tricky math problems (whose answer is a non-negative integer) written in LaTeX format from the AI Mathematical Olympiad (AIMO) competition. Your task is to accurately analyze and solve intricate mathematical problems, demonstrating a deep understanding of mathematical concepts and a strong ability to apply logical reasoning strategies.

Instruction:
1. Carefully read and comprehend the problem statement provided in the "Problem" section.
2. In the "Solution" section, provide a solution of the problem with detailed explanation of your logical reasoning process. Keep in mind that answer must be a non-negative integer number.
3. At the end, create a "Answer" section where you will state only the final numerical or algebraic answer, without any additional text or narrative.

Problem:
{problem}

Solution:
{solution}
"""

deepseek_template = """Problem:
{problem}

Instructions:
Please integrate natural language reasoning with programs to solve the problem above, 
and put your final into an "Answer" section where you will state only the final numerical 
or algebraic answer, without any additional text or narrative.'
"""

def colorize_text(text):
    for word, color in zip(["Role", "Instruction", "Problem", "Solution", "Answer"],
                           ["blue", "orange", "red", "brown", "green"]):
        text = text.replace(f"{word}:", f"\n\n**<font color='{color}'>{word}:</font>**")
    return text

def is_integer(text):
    try:
        if int(text) >= 0:
            return True
        else:
            return False
    except ValueError:
        return False
    

# Extract answer from model response
def get_answer(text):
    try:
        answer = re.search(r'Answer:\s*([\s\S]+)', text).group(1).strip()
        answer = answer.replace(",", "")
        if is_integer(answer):
            return int(answer)%1000
        else:
            return 0
    except:
        return 0
    
    
def infer(problem, model, template):
    # Generate Prompt using template
    prompt = template.format(
            problem=problem,
            solution=""
        )

    # Infer
    return model(prompt) 

In [None]:
@contextlib.contextmanager
def new_tempfile():
    try:
        fd, fname = tempfile.mkstemp(suffix=".py")
        os.close(fd)
        logger.info('temp file: %s', fname)
        yield fname
    finally:
        if os.path.exists(fname):
            os.unlink(fname)
            logger.info('temp file %s to be deleted', fname)

            
def run_code_chunk(chunk, timeout=7):
    with new_tempfile() as code_file: 
        with open(code_file, "w") as f:
            f.write(chunk)

        logger.info("running python3 on \n```\n%s\n```", chunk)

        try:
            result = subprocess.run(
                ["python3", code_file], 
                timeout=timeout, 
                capture_output=True
            )
        except subprocess.TimeoutExpired:
            logger.info("timeout occured")
            return -1

        if result.returncode:
            logger.info("error occured %s", result.stderr.decode('utf8'))
            return -1

        stdout = result.stdout.decode('utf8')
        logger.info("output: %s", stdout)

        try:
            answer = int(stdout)
        except ValueError:
            answer = -1

        return answer
    

In [None]:
if do_test:
    inf_loop = """while True:
        pass
    """
    ! ls /tmp
    result = run_code_chunk(inf_loop, 1)
    print(result)
    
    result = run_code_chunk(code_chunk(example))
    print(result)
    
    chunk = textwrap.dedent("""
    def f(a, b):
        return a*b
    
    print(f(2, 12))
    """)
    result = run_code_chunk(chunk)
    print(result)
    ! ls /tmp
    

In [None]:
# def process_output(output):
#     result = output
    
#     try:
#         code = output.split('```')[1][7:]

#         with open('code.py', 'w') as fout:
#             fout.write(code)

#         batcmd = 'timeout 7 ' + sys.executable + ' code.py'
#         try:
#             shell_output = subprocess.check_output(batcmd, shell=True).decode('utf8')
#             print(shell_output)
#             code_output = round(float(eval(shell_output))) % 1000
#         except:
#             code_output = -1

#         print('CODE RESULTS', code_output)
    
#     except Exception as e:
#         print(e)
#         print('ERROR PARSING')
#         code_output = -1
    
#     try:
#         result_output = re.findall(r'\\boxed\{(.*)\}', result)

#         print('BOXED', result_output)
#         if not len(result_output):
#             result_output = naive_parse(result)
#         else:
#             result_output = result_output[-1]

#         print('BOXED', result_output)
#         if not len(result_output):
#             result_output = -1
        
#         else:
#             result_output = round(float(eval(result_output))) % 1000
    
#     except Exception as e:
#         print(e)
#         print('ERROR PARSING')
#         result_output = -1
    
#     return result_output, code_output

In [None]:
import re
from collections import defaultdict


tool_instruction = " The answer should be given as a non-negative modulo 1000."
tool_instruction += '\nPlease integrate natural language reasoning with programs to solve the problem above, and put your final answer within \\boxed{}.'


n_repetitions = 8 if PRIVATE else 2
temperature = 0.8964

total_results = []
total_answers = []


for i in tqdm(range(len(df))):
    id_ = df['id'].loc[i]
    problem = df['problem'].loc[i]

    messages = [
        {
            "role": "user", 
            "content": problem + tool_instruction
        }
    ]
    
    query_prompt = tokenizer.apply_chat_template(
        messages,
        tokenize=False
    )
    
    results = []
    answers = []
     
    
    for _ in tqdm(range(n_repetitions)):
        try:
            raw_output = pipeline(
                query_prompt, 
                max_new_tokens=2048, 
                do_sample=True, 
                temperature=temperature,
                return_full_text=False
            )
            raw_output = raw_output[0]['generated_text']

            result_output, code_output = process_output(raw_output)

            torch.cuda.empty_cache()
            gc.collect()

        except Exception as e:
            print(e)
            result_output, code_output = -1, -1
        
        results.append(result_output)
        answers.append(code_output)
    
    total_results.append(results)
    total_answers.append(answers)

In [None]:
import numpy as np
from collections import Counter

df['leng'] = df['problem'].astype(str).map(len)
df['orig_index'] = df.index.values
df = df.sort_values(by=['leng', 'id']).reset_index(drop=True)
df['enumerates'] = range(0, len(df))
df = df.sort_values('orig_index').reset_index(drop=True)

enumerate_i = 0
final_answers = []
for a, b in zip(total_answers, total_results):
    a = np.array(a)
    b = np.array(b)
    a[a < 0] = b[a < 0]
    pred = Counter(a.tolist()).most_common(2)
    pred = pred + [(-1,0)]
    val_previously, freq_previously = pred[0]
    for val, freq in pred[1:]: 
        if freq == freq_previously:
            val_previously = min(val_previously,val )
    enumerates = df.enumerates.values[enumerate_i]
    ans = val_previously if not val_previously < 0 else pred[1][0]
    enumerate_i+= 1    
    final_answers.append(ans)
    print(ans)

In [None]:
class ProblemSolver:
    def __init__(
        self, 
        pipeline, 
        repeat=1,             
    ):
        self.pipeline = pipeline
        self.repeat = repeat
           
    def answer(self, question):
        messages = [
        {
            "role": "user", 
            "content": 
        }
    ]
    
    query_prompt = tokenizer.apply_chat_template(
        messages,
        tokenize=False
    )
    
        answers = [self.solve(question) for _ in range(self.repeat)]
        
    
    def solve(question):
        
        
if 'solve_problem' not in dir():
    def solve_problem(text):
        
        return 0

In [None]:
import pandas as pd
class FakeEnv:
    def __init__(self, questions):
        self.questions = questions
        self.answers = []
        
    def iter_test(self):
        for i, (q, a) in enumerate(self.questions):
            yield (
                pd.DataFrame([{'id':i, 'problem': q}]).set_index('id'), 
                pd.DataFrame([{'id':i, 'answer':0, 'true answer': a}]).set_index('id')
            )
        
        
    def predict(self, submission):
        self.answers.append(submission)
        


In [None]:
fake_env = FakeEnv(
    [random_arithmetic_question() for _ in range(5)]+
    [random_equation_question() for _ in range(5)]
)
problems, submit_answer = fake_env.iter_test(), fake_env.predict 

In [None]:
import random

def random_arithmetic_question():
    while True:
        op = random.choice("+-*")
        a, b = random.choices(range(1000), k=2)
        expression = f"{a}{op}{b}"
        result = eval(expression)
        if result >= 0:
            break
    answer = result % 1000
    if random.uniform(0, 2) > 1:
        expression.replace("*", "\\times")
    return f"What is ${expression}$?", answer


def random_equation_question():
    while True:
        op = random.choice("+-*")
        a, x = random.choices(range(100), k=2)
        expression = f"{a}{op}{x}"
        b = eval(expression)
        if b >= 0:
            break
    answer = x % 1000
    var_name = random.choice("abcxyz")
    expression = f"{a}{op}{var_name}={b}"
    if random.uniform(0, 2) > 1:
        expression.replace("*", "\\times")
    return f"Solve ${expression}$ for ${var_name}$.", answer



In [None]:
if do_test:
    print(random_arithmetic_question())
    print(random_equation_question())