In [None]:
!pip install openai==0.28
!pip install tiktoken
!pip install tqdm
!pip install matplotlib
!pip install sympy
!pip install langchain

In [None]:
import numpy as np
import openai
import tiktoken
from tqdm.auto import trange, tqdm
import time
import os
import json
from tqdm import tqdm
import re
from types import NoneType
import multiprocessing.dummy
from io import StringIO
from contextlib import redirect_stdout
import signal
from contextlib import contextmanager
import matplotlib.pyplot as plt
import sys
import ast
import copy

In [None]:
import pandas as pd
from IPython.display import display_latex
from langchain.callbacks import get_openai_callback
from tqdm.auto import tqdm


## LLM Setup

In [None]:
OPENAI_API_KEY = ""

In [None]:
# Code generating prompts
system_prompt = """
You are a helpful code assistant. 
You have knowledge of math. 
You can provide code solutions to math problems. 
Your language of choice is Python, you can use public python libraries. 
Do not use sympy. 
Don't explain the code, just generate the code block itself.
"""
my_prompt = """
Provide good python code to solve the following problem: {problem}, no need to implement it. 
The code should save the answer as a variable called 'result'.
Each test answer should be a number—either an integer or finite decimal float with "." as a separator.
Correct format examples:
4231
-12
0.75
Incorrect format examples:
4 2 3 1
-12.0
0,2
12/35
"""

In [None]:
# Chain of thought
CoT_system_prompt = """
You are a helpful assistanst with general knowledge and knowledge of math and geometry.
"""
CoT_prompt = """
Solve the following problem: {problem_text}

Example of good solutions:
problem_text1: Each of the four inequalities in the left column corresponds to one of the solutions in the right column. Establish the correspondence between the inequalities and their solutions.

A) $\log _4 x>1$ & 1) $0<x<1/4 $ 
B) $\log _4 x>-1$ & 2) $x>1/4 $  
C) $\log _4 x<-1$ & 3) $0<x<4$  
D) $\log _4 x<1$ & 4) $x>4$ 

Fill in the table provided in the answer with the corresponding solution number under each letter.
A & B & C & D


Solution1: To solve this problem, let's analyze each inequality one by one:
A) $\log_4 x > 1$
To solve this inequality, we need to rewrite it in exponential form:
$4^1 < x$
Simplifying, we have:
$4 < x$
So the solution to this inequality is $x > 4$. This corresponds to solution number 4.
B) $\log_4 x > -1$
To solve this inequality, we need to rewrite it in exponential form:
$4^(-1) < x$
Simplifying, we have:
$ 1/4 < x$
So the solution to this inequality is $x > 1/4$. This corresponds to solution number 2.
C) $\log_4 x < -1$
To solve this inequality, we need to rewrite it in exponential form:
$4^(-1) > x$
Simplifying, we have:
$1/4 > x$
So the solution to this inequality is $x < 1/4$. This corresponds to solution number 1.
D) $\log_4 x < 1$
To solve this inequality, we need to rewrite it in exponential form:
$4^1 > x$
Simplifying, we have:
$4 > x$
So the solution to this inequality is $x < 4$. This corresponds to solution number 3.
Now, let's fill in the table with the corresponding solution numbers:
Answer is 4213
"""


In [None]:
extract_answer_prompt = """Task:
{problem_text}
Solution:
{solution}
Extract the answer according to the task, output the answer value ONLY. 
Each test answer should be a number—either an integer or finite decimal float with "." as a separator.
Correct format examples:
4231
-12
0.75
Incorrect format examples:
4 2 3 1
-12.0
0,2
12/35
"""

In [None]:
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    ("user", my_prompt),
])

In [None]:
chain_of_thought_prompt = ChatPromptTemplate.from_messages([
    ("system", CoT_system_prompt),
    ("user", CoT_prompt),
])

In [None]:
prompt_extract = ChatPromptTemplate.from_messages([
    ("system", """"""),
    ("user", extract_answer_prompt),
])

In [None]:
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, model="gpt-4-1106-preview")

In [None]:
from langchain_core.output_parsers import StrOutputParser

output_parser = StrOutputParser()

chain_solution = prompt | llm | output_parser

In [None]:
CoT_solution = chain_of_thought_prompt | llm | output_parser

In [None]:
chain_extract = prompt_extract | llm | output_parser

## Helper Functions

In [None]:
def generate_code_llm(problem_text, new_code_col, chain_solution=chain_solution):
    new_code = chain_solution.invoke({
        "problem": problem_text
    })
    return new_code_col, new_code


In [None]:
def get_test_answer_llm(problem_text, chain_solution=CoT_solution, chain_extract=chain_extract):
    solution = chain_solution.invoke({
        "problem_text": problem_text
    })
    answer = chain_extract.invoke({
        "problem_text": problem_text,
        "solution": solution,
    })
    return pd.Series({'test_answer': answer, 'solution': solution})

In [None]:
def run_python_code(input_code):
    """
    Execute Python code provided as input and return the result.

    Parameters:
    - input_code (str): The Python code to be executed, enclosed in triple-backticks (```python ... ```).

    Returns:
    - result: The result of executing the provided Python code.
      If the code runs successfully, returns the value of the 'result' variable from the code execution.
      If an error occurs during execution, returns an error message.
    """
    
    # Remove the leading and trailing marks
    code_to_run = input_code[len("```python\n"):-len("\n```")]

    # Execute the Python code
    try:
        # Create a namespace for the code to run in
        namespace = {}
        # Execute the code in the namespace
        exec(code_to_run, namespace)
        # Retrieve the result
        result = namespace.get('result', None)
        return result
    except Exception as e:
        return f"Error: {str(e)}"

In [None]:
def to_ints(data):
    """
    Attempt to convert a string to an integer or a rounded float.

    Parameters:
    - data (str): The input string to be converted.

    Returns:
    - result: If successful, returns the converted integer or rounded float as a string.
      If unsuccessful, returns pandas NA (missing value indicator).
    """
    try:
        # Attempt to convert the string to a float
        float_value = float(data)
        float_value = round(float_value, 2)
        if float_value == float('nan'):
            return pd.NA
        
        # Check if the float has a ".0" decimal part
        if float_value.is_integer():
            return str(int(float_value))
        else:
            return str(float_value)
    except ValueError:
        # If conversion to float fails, return the original string
        return pd.NA

In [None]:
def add_strong_majority_column(df, cols):
  """
  Adds a column named 'strong_majority' to the DataFrame 'df'.
  The column indicates whether the mode of the specified columns 'cols' 
  repeats more than 3 times in each row, using explicit iteration.

  Args:
      df: The DataFrame to process.
      cols: A list of column names to consider for finding the mode.

  Returns:
      The DataFrame with the added 'strong_majority' column.
  """
  df['strong_majority'] = None
  for index, row in df.iterrows():
    mode_counts = row[cols].value_counts()
    strong_majority = mode_counts.max() > 3 if not mode_counts.empty else False
    df.loc[index, 'strong_majority'] = strong_majority
  return df

## run on the test set

In [None]:

test = pd.read_csv('/kaggle/input/prompt-engineering-math/test_with_translation.csv')
test.head()

Generate python code to solve the problems using our model. Repeat 5 times and save each result.

In [None]:
from langchain.callbacks import get_openai_callback
from tqdm.auto import tqdm

tqdm.pandas()

with get_openai_callback() as cb:
    # Generate values for 'new_code_{i}' columns
    for i in range(1, 6):
        new_code_col = f'new_code_{i}'
        test[new_code_col] = None
        missing_values_mask = test[new_code_col].isna()
        print(f"Generating values for {new_code_col}...")
        
        test.loc[missing_values_mask, [new_code_col]] = (
            test.loc[missing_values_mask, 'translation'].progress_apply(
                lambda x: generate_code_llm(x, new_code_col)[1]
            )
        )
        
        # Debugging: Print the generated values
        print(test.loc[missing_values_mask, [new_code_col]])

    print(cb)


Run the python code generated and save the answers.

In [None]:

for i in range(1, 6):
    new_code_col = f'new_code_{i}'
    code_ans_col = f'code_ans_{i}'
    test[code_ans_col] = None
    test.loc[test[code_ans_col].isna(), [code_ans_col]] = (
        test.loc[test[code_ans_col].isna(), new_code_col].progress_apply(run_python_code)
    )


Treat the results to the right format.

In [None]:
for i in range(1, 6):
    code_ans_col = f'code_ans_{i}'
    test[code_ans_col] = pd.to_numeric(test[code_ans_col], errors='coerce')
    test[code_ans_col] = test[code_ans_col].apply(to_ints)


Decide on the final answer based on the majority of 5 runs

In [None]:
import pandas as pd
import random
import statistics

count_errors = 0
FILLER_NUMBER = '9999'
code_ans_columns = [f'code_ans_{i}' for i in range(1, 6)]
test['final_answer'] = None

mode = test[code_ans_columns].dropna().mode(axis=1)
for i, row in test.iterrows():
    
    values = [val for val in row[code_ans_columns].values if str(val).lower() != 'nan']
    row_mode =  mode.iloc[i].mode()
    if row_mode.count() == 1:
        row_ans = row_mode.iloc[0]
    else:
        row_ans = random.choice([val for val in row_mode.values if str(val).lower() != 'nan'])
    if row_ans == 'nan':
        if len(values) > 0:
            row_ans = statistics.mode(values)
        else:    
            row_ans = FILLER_NUMBER
            count_errors += 1
            print(f"ERROR AT ROW NUMBER {i}")    

    

    test.at[i, 'final_answer'] = row_ans

print(f"NUMBER OF ERRORS: {count_errors}")



In [None]:
test

Add process for low - threshold majority rows

In [None]:

tqdm.pandas()

answer_cols = ['code_ans_1', 'code_ans_2', 'code_ans_3', 'code_ans_4', 'code_ans_5']
test_majority = add_strong_majority_column(test.copy(), answer_cols)
test_majority[['solution', 'test_answer']] = None
test_mask = (test_majority['strong_majority'] == False) | (test_majority['final_answer'] == 9999) 
with get_openai_callback() as cb:
    test_majority.loc[test_mask, ['test_answer', 'solution']] = (
        test_majority.loc[test_mask, 'translation'].progress_apply(get_test_answer_llm)
    )
    print(cb)


In [None]:
test_majority[test_mask]

Choose the new answer if numeric, otherwise keep the old answer.

In [None]:
test_majority.loc[test_mask, 'final_answer'] = pd.to_numeric(test_majority.loc[test_mask, 'test_answer'], errors='coerce').fillna(test_majority.loc[test_mask, 'final_answer'])
test_majority[test_mask]

Format the answers

In [None]:
test_majority['final_answer'] = pd.to_numeric(test_majority['final_answer'], errors='coerce')
test_majority['final_answer'] = test_majority['final_answer'].apply(to_ints)

Submition format + export

In [None]:
majority_test_to_submit = test_majority[['problem_id', 'final_answer']].copy()
majority_test_to_submit.columns = ['problem_id', 'answer']

In [None]:
majority_test_to_submit.to_csv('submission_majority_organized_2.csv', index=False)
test_majority.to_csv('submission_majority_full_table_2.csv', index=False)