In [None]:
!pip install -q -U google-genai

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.1/43.1 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/231.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m225.3/231.9 kB[0m [31m9.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m231.9/231.9 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
from typing import Any, List, Tuple, Callable
import sys
sys.set_int_max_str_digits(0)
import random
import numpy as np
from google import genai
import asyncio
import time

# ------------------- Database -------------------

class Database:
    def __init__(self, num_categories = 5, mutation_rate = 0.1, num_inspirations = 5, num_elites = 5):
        self.results = []  # main generated programs
        self.inspirations = []  # new ideas to try
        self._next_result_id = 1
        self._next_inspiration_id = 1
        self.client = genai.Client()
        self.num_categories = num_categories
        self.mutation_rate = mutation_rate
        self.num_inspirations = num_inspirations
        self.num_elites = num_elites

    def print_categories(self):
        """Print all categories and their associated results (ids + scores)."""
        print("\n=== Categories ===")

        # Group results by category
        categories = {}
        for entry in self.results:
            cat = entry.get("category", None)
            if cat is not None:
                categories.setdefault(cat, []).append(entry)

        # Print grouped results
        for cat, entries in categories.items():
            print(f"\nCategory {cat}:")
            for e in entries:
                score = e["results"].get("score", 0)
                print(f"  - Result {e['id']}: score={score}")

    def cosine_similarity(self, a, b):
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

    def add_result(self, program, results, base_prompt, embedding):
        # Decide whether to create a new category
        if len(self.results) < self.num_categories:
            # make a new category
            category = f"cat-{len(self.results) + 1}"
            print("creating new category")
        else:
            # Assign category by embedding similarity
            sims = [
                (self.cosine_similarity(embedding, r["embedding"]), r)
                for r in self.results
            ]
            best_sim, closest_result = max(sims, key=lambda x: x[0])
            category = closest_result["category"]

        result_entry = {
            "id": self._next_result_id,
            "program": program,
            "results": results,
            "base_prompt": base_prompt,
            "embedding": embedding,
            "category": category,
        }

        self.results.append(result_entry)

        self._next_result_id += 1
        return result_entry

    def add_inspiration(self, parent_result_id, description, result_id=None):
        inspiration_entry = {
            "id": self._next_inspiration_id,
            "parent_result_id": parent_result_id,
            "description": description,
            "result_id": result_id
        }
        self.inspirations.append(inspiration_entry)
        self._next_inspiration_id += 1
        return inspiration_entry["id"]

    def sample(self):
      if not self.results:
          return None

      # pick a random category
      categories = list(set(r["category"] for r in self.results))
      category = random.choice(categories)

      # filter results from this category
      cat_results = [r for r in self.results if r["category"] == category]

      # pick top 5 in that category by score
      top_cat_results = sorted(cat_results, key=lambda r: r["results"].get("score", 0), reverse=True)[:self.num_elites]

      selected_entry = random.choice(top_cat_results)

      # Get all inspirations for this parent that don't yet have a generated result
      all_inspirations = [
          insp
          for insp in self.inspirations
          if insp["parent_result_id"] == selected_entry["id"]
      ]

      unused_inspirations = [
          insp for insp in all_inspirations if insp["result_id"] is None
      ][:self.num_inspirations]

      # === NEW: Cross-category inspiration swap (mutation) ===
      if random.random() < self.mutation_rate and len(categories) > 1:
          # pick a different category
          other_category = random.choice([c for c in categories if c != category])

          # filter results from that other category
          other_results = [r for r in self.results if r["category"] == other_category]

          # pick top 5 in the other category
          top_other_results = sorted(other_results, key=lambda r: r["results"].get("score", 0), reverse=True)[:self.num_elites]

          other_selected = random.choice(top_other_results)

          # swap inspirations (but keep the original selected_entry!)
          all_inspirations = [
              insp for insp in self.inspirations if insp["parent_result_id"] == other_selected["id"]
          ]
          unused_inspirations = [
              insp for insp in all_inspirations if insp["result_id"] is None
          ][:self.num_inspirations]

          print(f"[Mutation] Swapped inspirations from category {category} → {other_category}")

      # sample one of them at random
      return selected_entry, unused_inspirations, all_inspirations


    def best(self):
        if not self.results:
            return None
        return max(self.results, key=lambda r: (r["results"].get("score", 0), r["id"]))

    def mark_inspiration_as_used(self, inspiration: Any, result_id: int):
        """
        Updates the inspiration that matches `inspiration_description` and has no result_id yet,
        setting its `result_id` to the given result_id.
        """
        for insp_entry in self.inspirations:
            if insp_entry["id"] == inspiration["id"] and insp_entry["result_id"] is None:
                insp_entry["result_id"] = result_id

# ------------------- Prompt Sampler -------------------

class PromptSampler:
    def __init__(self, llm):
        self.llm = llm  # LLM instance used to generate new base prompt

    async def build(self, parent_program: str, base_prompt: str, inspiration: str) -> str:
        """
        Use the LLM to generate a new base prompt given the previous base prompt,
        the parent program, and an inspiration. Returns the combined prompt for diff generation.
        """
        if inspiration:
            new_base_prompt = await self.llm.generate_new_base_prompt(
                base_prompt=base_prompt,
                parent_program=parent_program,
                inspiration=inspiration
            )
        else:
            new_base_prompt = base_prompt

        # Combine into full prompt for diff generation
        combined_prompt = f"""
Base Prompt for this iteration:
{new_base_prompt}

Parent Program:
{parent_program}

Instructions:
Generate diffs to improve the parent program. Use the following format for all changes:

<<<<<<< SEARCH
# Original code block to be found and replaced
=======
# New code block to replace the original
>>>>>>> REPLACE

Make sure the final computed value to be evaluated is assigned to the variable `result`.
"""
        return combined_prompt, new_base_prompt

# ------------------- LLM -------------------

from pydantic import BaseModel
class Inspiration(BaseModel):
    description: str

class LLM:
    def __init__(self):
        self.client = genai.Client() # Gets API key from GEMINI_API_KEY

    async def embed_program(self, program: str):
        """Generate embedding vector for a program string."""
        response = await self.client.aio.models.embed_content(
            model="gemini-embedding-001",
            contents=program,
        )
        return np.array(response.embeddings[0].values)

    async def generate(self, base_prompt: str):
        response = await self.client.aio.models.generate_content(
            model="gemini-2.0-flash-lite",
            contents=base_prompt
        )
        diff_text = response.candidates[0].content.parts[0].text
        return diff_text, base_prompt

    def apply_diff(self, parent_program: str, diff: str):
        import re
        pattern = re.compile(
            r"<<<<<<< SEARCH\n(.*?)\n=======\n(.*?)\n>>>>>>> REPLACE",
            re.DOTALL
        )

        updated_program = parent_program
        for match in pattern.finditer(diff):
            original_block = match.group(1).strip("\n")
            new_block = match.group(2).strip("\n")
            updated_program = updated_program.replace(original_block, new_block)
        return updated_program

    async def generate_inspiration_regression(
        self,
        parent_base_prompt: str,
        parent_program: str,
        parent_results: dict,
        child_program: str,
        child_results: dict,
        existing_inspirations=list[dict]
    ) -> list[str]:
        """
        Produce a short inspiration idea when a new child performed worse than the parent.
        The idea should hypothesize about the hidden evaluation criteria and suggest
        prompt updates that could improve score next time.
        """
        prompt = f"""
You are analyzing a code evolution experiment where the goal is to maximize an UNKNOWN (hidden) score.

CONTEXT (Parent performed BETTER than Child):
- Parent Base Prompt:
{parent_base_prompt}

- Parent Program:
{parent_program}

- Parent Results (incl. score):
{parent_results}

- Child Program (performed worse):
{child_program}

- Child Results (incl. score):
{child_results}

- Existing Updates/Inspirations:
{existing_inspirations}

TASK:
1) Diagnose why the Child likely underperformed relative to the Parent.
2) Hypothesize about the hidden scoring function (what it may reward/penalize).
3) Recommend 1–3 concrete, testable updates to the BASE PROMPT that better target the hidden criteria. Do not repeat any of the existing updates.
4) Summarize each recommendation as ONE concise "inspiration idea" that we can try next iteration. Do not repeat any of the existing inspirations.

OUTPUT FORMAT:
- Return ONLY a JSON list of inspirations.
- Each inspiration should be an object of the form: {{ "description": "<string>" }}
- The "description" should briefly capture the suspected scoring factors and how to update the base prompt.
"""
        response = await self.client.aio.models.generate_content(
            model="gemini-2.5-flash",
            contents=prompt,
            config={
                "response_mime_type": "application/json",
                "response_schema": list[Inspiration],
            },
        )
        inspirations: list[Inspiration] = response.parsed
        return [inspiration.description for inspiration in inspirations]


    async def generate_new_base_prompt(self, base_prompt: str, parent_program: str, inspiration: str) -> str:
        """
        Call Gemini to create a new base prompt given previous prompt, program, and inspiration.
        """
        prompt = f"""
You are improving a code generation prompt.

Previous Base Prompt:
{base_prompt}

Parent Program:
{parent_program}

Inspiration Idea:
{inspiration["description"]}

Generate a new base prompt that incorporates the inspiration idea
and would lead to an improved program.
Return only the new prompt as plain text.
"""
        response = await self.client.aio.models.generate_content(
            model="gemini-2.5-flash",
            contents=prompt
        )
        new_prompt = response.candidates[0].content.parts[0].text
        return new_prompt

    async def generate_recommendation(self, base_prompt: str, program: str, results: dict, existing_inspirations: list[dict]) -> list[str]:
        """
        Ask Gemini to suggest future improvements with explicit reasoning
        about the UNKNOWN scoring function.
        """
        prompt = f"""
You are reviewing a code generation experiment where the evaluation score is HIDDEN/UNKNOWN.
Assume we want to maximize that hidden score and learn from this iteration.

Base Prompt:
{base_prompt}

Generated Program:
{program}

Evaluation Results (incl. score):
{results}

- Existing Updates:
{existing_inspirations}

TASK:
1) Infer what the hidden scoring function might reward or penalize based on the current outcome.
2) Propose 1–3 concise, testable updates to the BASE PROMPT that exploit those hypotheses. Do not repeat any of the existing updates.
3) Summarize each update as one short "inspiration idea" (1–3 sentences) to try next iteration.

OUTPUT FORMAT:
- Return ONLY a JSON list of inspirations.
- Each inspiration should be an object of the form: {{ "description": "<string>" }}
- The "description" should briefly capture the suspected scoring factors and how to update the base prompt.
"""

        response = await self.client.aio.models.generate_content(
            model="gemini-2.5-flash",
            contents=prompt,
            config={
                "response_mime_type": "application/json",
                "response_schema": list[Inspiration],
            },
        )
        inspirations: list[Inspiration] = response.parsed
        return [inspiration.description for inspiration in inspirations]

# ------------------- Evaluator -------------------

class Evaluator:
    def __init__(self, eval_fn: Callable[[str, Any], dict]):
        self.eval_fn = eval_fn

    def execute(self, program: str) -> dict:
        local_env = {}
        try:
            exec(program, {}, local_env)
            result_value = local_env.get("result", None)
        except Exception as e:
            return {"score": -1, "error": str(e)}

        return self.eval_fn(program, result_value)

# ------------------- Example Custom Eval -------------------

def my_custom_eval(program: str, result: Any) -> dict:
    score = result if isinstance(result, int) else 0
    return {"score": score, "output": f"Custom evaluated {program}"}

# ------------------- Enabler -------------------


class Enabler:
    def __init__(self, database, prompt_sampler, llm, evaluator, iterations=20):
        self.database = database
        self.prompt_sampler = prompt_sampler
        self.llm = llm
        self.evaluator = evaluator
        self.iterations = iterations

    # async def run(self):
    #     for i in range(self.iterations):
    #         iter_start = time.perf_counter()
    #         print(f"\n=== Iteration {i+1}/{self.iterations} ===")

    #         parent_entry, inspirations, all_inspirations = self.database.sample()

    #         parent_id = parent_entry["id"]
    #         print(f"Chosen Parent Id {parent_id}")

    #         # Try each available inspiration
    #         for inspiration in inspirations:
    #             await self.explore_inspiration(parent_entry, inspiration, all_inspirations)

    #         iter_end = time.perf_counter()
    #         print(f"Iteration {i+1} took {iter_end - iter_start:.2f} seconds")


    #         print(self.database.inspirations)
    #         self.database.print_categories()

    #     print("Final Categories: \n")
    #     self.database.print_categories()

    async def run(self):
        for i in range(self.iterations):
            iter_start = time.perf_counter()
            print(f"\n=== Iteration {i+1}/{self.iterations} ===")

            parent_entry, inspirations, all_inspirations = self.database.sample()

            parent_id = parent_entry["id"]
            print(f"Chosen Parent Id {parent_id}")

            # Schedule all inspirations concurrently
            tasks = [
                asyncio.create_task(
                    self.explore_inspiration(parent_entry, inspiration, all_inspirations)
                )
                for inspiration in inspirations
            ]

            # Wait for all to finish before moving to the next parent
            await asyncio.gather(*tasks)

            iter_end = time.perf_counter()
            print(f"Iteration {i+1} took {iter_end - iter_start:.2f} seconds")

            # Show current best
            # best_entry = self.database.best()
            # print("\nBest so far:", best_entry)
            print(self.database.inspirations)
            self.database.print_categories()

        # Final best output
        # best_entry = self.database.best()
        print("Final Categories: \n")
        self.database.print_categories()
        # print("Best Evaluation Results:\n", best_entry)

    async def explore_inspiration(self, parent_entry: dict, inspiration: dict, all_inspirations: list[dict]):
        parent_program = parent_entry["program"]
        base_prompt = parent_entry["base_prompt"]

        # Capture the parent entry & score at the start of the iteration
        parent_score = parent_entry["results"].get("score", 0)
        # Build combined prompt (returns new base prompt as well)
        combined_prompt, new_base_prompt = await self.prompt_sampler.build(
            parent_program, base_prompt, inspiration
        )

        # Get LLM diff, apply it, evaluate the child
        diff, _ = await self.llm.generate(combined_prompt)
        child_program = self.llm.apply_diff(parent_program, diff)

        results = self.evaluator.execute(child_program)
        child_score = results.get("score", 0)
        if child_score < parent_score:
          # Regression: create inspirations for the parent
          parent_insps = await self.llm.generate_inspiration_regression(
              parent_base_prompt=base_prompt,
              parent_program=parent_program,
              parent_results=parent_entry["results"],
              child_program=child_program,
              child_results=results,
              existing_inspirations=all_inspirations
          )
        else:
            # No regression: still create inspirations for the parent
            parent_insps = await self.llm.generate_recommendation(
                base_prompt,
                parent_program,
                parent_entry["results"],
                existing_inspirations=all_inspirations
            )

        # Child always gets inspirations
        child_insps = await self.llm.generate_recommendation(
            new_base_prompt,
            child_program,
            results,
            existing_inspirations=all_inspirations
        )

        embedding = await self.llm.embed_program(child_program)

        result = self.database.add_result(child_program, results, new_base_prompt, embedding)

        self.database.mark_inspiration_as_used(inspiration=inspiration, result_id=result["id"])

        print(f"Inspirations Length Parent Insps:{len(parent_insps)}, Child Insps: {len(child_insps)}")

        for insp in parent_insps:
            self.database.add_inspiration(
                parent_result_id=parent_entry["id"],
                description=insp
            )
            print("\nParent Inspiration:\n", insp)
        for insp in child_insps:
            self.database.add_inspiration(
                parent_result_id=result["id"],
                description=insp
            )
            print("\nChild Inspiration:\n", insp)

# ------------------- Example Usage -------------------

# Seed database
database = Database()
initial_program = """
def compute():
    return 10

result = compute()
"""
initial_base_prompt = """
This prompt will be used to generate code, but it is unclear exactly what the evaluation
metric is. You should write code that you think would maximize some kind of score. Keep the code relatively short and simple.
Focus on producing code that is correct, well-structured, and likely to achieve a high score.
"""
llm = LLM()

embedding = await llm.embed_program(initial_program)

database.add_result(initial_program, my_custom_eval(initial_program, 10), initial_base_prompt, embedding)
database.add_inspiration(parent_result_id=1, description="")

# Initialize components

prompt_sampler = PromptSampler(llm)
evaluator = Evaluator(eval_fn=my_custom_eval)
enabler = Enabler(database, prompt_sampler, llm, evaluator, iterations=2)

# Run the evolution
await enabler.run()


creating new category

=== Iteration 1/2 ===
Chosen Parent Id 1
creating new category
Inspirations Length Parent Insps:2, Child Insps: 2

Parent Inspiration:
 The scoring function appears to be directly the numerical value of the 'result' variable; update the prompt to instruct the model to explicitly maximize this final numerical value.

Parent Inspiration:
 The Child's added complexity (parameters, docstrings) did not improve the score and might be unnecessary; prompt should clarify that simpler code is preferred unless complexity directly leads to a higher 'result' value.

Child Inspiration:
 The score likely rewards a more complex 'meaningful calculation' than simple addition. I will update the prompt to suggest a function that performs a multi-step operation or uses a more specialized mathematical function to increase its perceived utility.

Child Inspiration:
 The scoring might favor a function that demonstrates greater versatility in handling input types. I will update the promp