In [None]:
import pathlib
import dspy
from dspy import Example
from dspy.evaluate.evaluate import Evaluate
from dspy.evaluate.metrics import answer_exact_match
from functools import partial
from dspy import Signature, InputField, OutputField
from pydantic import BaseModel, Field
from datetime import datetime
from dspy.teleprompt import MIPROv2
from dspy.teleprompt.random_search import BootstrapFewShotWithRandomSearch


In [None]:

class Problem(BaseModel):
    problem_dir: pathlib.Path = Field(
        ..., description="The path to the problem directory"
    )
    problem_name: str = Field(..., description="The name of the problem")
    problem_description: str = Field(..., description="The description of the problem")
    sample_input: str = Field(..., description="The sample input of the problem")
    sample_output: str = Field(..., description="The sample output of the problem")
    problem_input: pathlib.Path = Field(..., description="The path to the input file")
    problem_output: pathlib.Path = Field(..., description="The path to the output file")
    solution: str = Field(..., description="The solution to the problem")

    @property
    def as_xml(self) -> str:
        return f"""
<problem>
<problem_statement>
{self.problem_description}
</problem_statement>
<sample_test_cases>
<sample_input>
{self.sample_input}
</sample_input>
<sample_output>
{self.sample_output}
</sample_output>
</sample_test_cases>
</problem>
"""


def load_problem(problem_name: str, problem_dir: pathlib.Path) -> Problem:
    problem_input = problem_dir / f"{problem_name}.in"
    problem_output = problem_dir / f"{problem_name}.out"
    sample_input = problem_dir / f"{problem_name}_sample_input.txt"
    sample_output = problem_dir / f"{problem_name}_sample_output.txt"
    problem_description = problem_dir / f"{problem_name}.md"
    solution = problem_dir / f"{problem_name}_sol.md"
    return Problem(
        problem_dir=problem_dir,
        problem_name=problem_name,
        problem_description=problem_description.read_text(),
        sample_input=sample_input.read_text(),
        sample_output=sample_output.read_text(),
        problem_input=problem_input,
        problem_output=problem_output,
        solution=solution.read_text(),
    )

In [None]:
problems = list(pathlib.Path("data/dataset/").rglob("*.in"))
eval_problems = list(filter(lambda x: "2022/final" in str(x), problems))
test_problems = list(filter(lambda x: "2023/practice" in str(x), problems))
train_problems = list(filter(lambda x: x not in eval_problems + test_problems, problems))


def load_problem_from_path(problem_path: pathlib.Path):
    problem_name = problem_path.stem
    problem_dir = problem_path.parent
    problem = load_problem(problem_name, problem_dir)
    return problem



    
train_problems = list(map(load_problem_from_path, train_problems))
eval_problems = list(map(load_problem_from_path, eval_problems))
test_problems = list(map(load_problem_from_path, test_problems))


In [None]:
(len(train_problems), len(eval_problems), len(test_problems))

In [None]:
trainset = [Example(
    problem_statement=problem.problem_description,
    sample_input=problem.sample_input,
    sample_output=problem.sample_output,
    solution=problem.solution).with_inputs("problem_statement", "sample_input", "sample_output") for problem in train_problems]

devset = [Example(
    problem_statement=problem.problem_description,
    sample_input=problem.sample_input,
    sample_output=problem.sample_output,
    solution=problem.solution).with_inputs("problem_statement", "sample_input", "sample_output") for problem in eval_problems]

In [None]:


lm = dspy.OpenAI(
        model="gpt-4o-mini", # Note: didn't find much a difference btwn mini & full gpt-4o
        max_tokens=4000,
        temperature=0.1,
    )

dspy.settings.configure(lm=lm)
dspy.configure(experimental=True)


def validate_solution(example, prediction, trace=None,frac=0.8):
    
    result = answer_exact_match(
        example=Example(answer=example.solution),
        pred=Example(answer=prediction.solution),
        trace=trace, 
        frac=frac)
    return result
    

# Setup evaluation function
evaluate = Evaluate(
    devset=devset,
    num_threads=6, # Note: Set this to 1 for debugging purposes 
    display_progress=True,
    display_table=5,
    metric=validate_solution
)

In [None]:

class GenerateSolution(Signature):
    """You are an expert problem solver. Your task is to solve the problem at hand.

    When solving a competitive programming problem, start by thoroughly reading and understanding the problem statement, including all constraints and input/output formats.
    Identify the key elements, analyze sample inputs/outputs, and consider edge cases.
    Look for patterns or mathematical relationships, then develop a logical approach, breaking the problem into subproblems if needed. 
    Explain the core insight in simple terms, followed by a step-by-step explanation of the solution.
    Use clear language and visual aids if helpful. Discuss optimization techniques, alternative approaches, and special cases. 
    Address time and space complexity, and provide pseudocode if appropriate. 
    Finally, proofread the solution for clarity and completeness, ensuring it's accessible to readers with varying levels of programming experience.
    Note: You are not expected to write the code for the solution, just provide a solution explanation so that an experienced developer can write the code for the solution.
    """

    problem_statement: str = InputField(format=str)
    sample_input: str = InputField(format=str, desc="The sample input provided with the problem statement.")
    sample_output: str = InputField(format=str, desc="The sample output provided with the problem statement.")
    solution: str = OutputField(
        format=str, desc="a solution explanation for how we should go about solving this problem."
    )

In [None]:
dspy.ChainOfThought(GenerateSolution)(
    problem_statement=trainset[0].problem_statement, 
    sample_input=trainset[0].sample_input, 
    sample_output=trainset[0].sample_output
    ).solution


In [None]:
class SimpleGenerateSolution(dspy.Module):
    def __init__(self):
        super().__init__()
        self.generate_code = dspy.ChainOfThought(GenerateSolution)

    def forward(self, problem_statement, sample_input, sample_output):
        solution = self.generate_code(
                problem_statement=problem_statement,
                sample_input=sample_input,
                sample_output=sample_output
            ).solution

        return dspy.Prediction(solution=solution)


In [None]:
# prediction = SimpleGenerateSolution()(problem_statement=trainset[0].problem_statement, 
#     sample_input=trainset[0].sample_input, 
#     sample_output=trainset[0].sample_output)

# print(prediction.solution)

In [None]:
simple_program = SimpleGenerateSolution()

In [None]:
evaluate(program=simple_program, devset=devset)


In [None]:


def optimize_with_mipro(program, prompt_model, task_model, metric, trainset):
    teleprompter = MIPROv2(
        prompt_model=prompt_model,
        task_model=task_model,
        metric=metric,
        num_candidates=5,
        init_temperature=0.5,
        verbose=False,
        log_dir="./logs",
    )

    optimized_program = teleprompter.compile(
        program.deepcopy(),
        trainset=trainset,
        eval_kwargs=dict(num_threads=16),
        max_bootstrapped_demos=0, # 0-shot optimization
        max_labeled_demos=0,
        num_batches=20,
        minibatch=False, # turning this off bc we have a small trainset already
        seed=9
    )
    now = datetime.now()
    date_time = now.strftime("%Y%m%d_%H%M%S")

    optimized_program.save(f"mipro_optimized_{date_time}")

    return optimized_program


def optimize_with_bootstrap_fewshot(program, task_model, teacher_model, metric, trainset):
    rs_optimizer = BootstrapFewShotWithRandomSearch(
        metric=metric,
        num_threads=8,
        num_candidate_programs=5,
        max_labeled_demos=0,
        max_bootstrapped_demos=2,
        max_errors=10000,
        teacher_settings=dict(lm=teacher_model)
    )
    
    optimized_program = rs_optimizer.compile(
        program,
        trainset=trainset,
    )

    now = datetime.now()
    date_time = now.strftime("%Y%m%d_%H%M%S")

    optimized_program.save(f"fewshot_optimized_{date_time}")


    return optimized_program

In [None]:
optimized_program = optimize_with_bootstrap_fewshot(simple_program, lm, lm, validate_solution, trainset)

In [None]:
evaluate(program=optimized_program, devset=devset)

In [None]:
mipro_optimized_program = optimize_with_mipro(simple_program, lm, lm, validate_solution, trainset)

In [None]:
evaluate(program=mipro_optimized_program, devset=devset)