In [1]:
from compiler.core import Compiler, CompileResult

In [2]:
from typing import Callable, Generic, Optional, TypeVar
import concurrent.futures

In [3]:
class Node[T]:
    data: T
    score: float
    parent: Optional["Node[T]"]
    children: list["Node[T]"]
    depth: int

    def __init__(self, data: T, score, parent: Optional["Node[T]"] = None):
        self.data = data
        self.score = score
        self.parent = parent
        self.children = []
        self.depth = parent.depth + 1 if parent else 0

    @property
    def is_terminal(self) -> bool:
        return not self.children
    
    def get_trajectory(self) -> list["Node[T]"]:
        trajectory = []
        node = self
        while node:
            trajectory.append(node)
            node = node.parent
        return trajectory[::-1]
    
    def best_solution(self) -> "Node[T]":
        all_nodes = [self] + self._get_all_children()
        return max(all_nodes, key=lambda x: int(x.is_terminal) * x.score)
    
    def _get_all_children(self) -> list["Node[T]"]:
        all_nodes, stack = [], [self]
        while stack:
            node = stack.pop()
            all_nodes.extend(node.children)
            stack.extend(node.children)
        return all_nodes

In [4]:
import re
import os
import jinja2
from anthropic import AnthropicBedrock
from core import stages
import json
import requests
import subprocess
from shutil import copytree, ignore_patterns

In [5]:
class CompilerService:
    def __init__(self, address: str | None = None):
        self.address = address
        if address is None:
            self.compiler = Compiler("botbuild/tsp_compiler", "botbuild/app_schema")
    
    def compile_typespec(self, schema: str) -> CompileResult:
        if self.address:
            response = requests.post(
                f"{self.address}/compile/typespec",
                json={"payload": schema},
            )
            response.raise_for_status()
            return CompileResult(**response.json())
        else:
            return self.compiler.compile_typespec(schema)
        
    def compile_drizzle(self, schema: str) -> CompileResult:
        if self.address:
            response = requests.post(
                f"{self.address}/compile/drizzle",
                json={"payload": schema},
            )
            response.raise_for_status()
            return CompileResult(**response.json())
        else:
            return self.compiler.compile_drizzle(schema)

In [6]:
from typing import TypedDict

In [7]:
LLM_MODEL = "anthropic.claude-3-5-sonnet-20241022-v2:0"

In [8]:
class TypespecData(TypedDict):
    message: dict
    output: stages.typespec.TypespecOutput
    feedback: CompileResult

In [9]:
def run_typespec(messages: list[dict], client: AnthropicBedrock, compiler: CompilerService) -> TypespecData:
    response = client.messages.create(
        model=LLM_MODEL,
        max_tokens=8192,
        messages=messages,
    )
    output = stages.typespec.parse_output(response.content[0].text)
    schema = "\n".join(['import "./helpers.js";', output["typespec_definitions"]])
    feedback = compiler.compile_typespec(schema)
    return {
        "message": {"role": "assistant", "content": response.content[0].text},
        "output": output,
        "feedback": feedback,
    }


def bfs_typespec(
    init_message: dict,
    root: Node[TypespecData],
    client: AnthropicBedrock,
    compiler: CompilerService,
    max_depth: int = 3,
    branch_factor: int = 3,
    max_workers: int = 5,
) -> Node[TypespecData]:
    while True:
        if root.best_solution().score == 1:
            break
        
        candidates: list[Node[TypespecData]] = []
        for node in [root] + root._get_all_children():
            if (
                node.is_terminal
                and node.depth < max_depth
                and node.data["feedback"]["exit_code"] != 0
            ):
                candidates.append(node)
        #candidates = [root] + root._get_all_children()
        #candidates = [node for node in candidates if node.is_terminal and node.depth < max_depth]
        if not candidates:
            break
        # expand each terminal node with branch_factor with concurrent thread pool
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_node: dict[concurrent.futures.Future, Node[TypespecData]] = {}
            for node in candidates:
                messages = [init_message]
                for n in node.get_trajectory():
                    messages.append(n.data["message"])
                    messages.append({"role": "user", "content": n.data["feedback"]["stdout"]})
                for _ in range(branch_factor):
                    future_to_node[executor.submit(run_typespec, messages, client, compiler)] = node
            for future in concurrent.futures.as_completed(future_to_node):
                node = future_to_node[future]
                result = future.result()
                score = 1 if result["feedback"]["exit_code"] == 0 else 0
                child = Node(result, score, parent=node)
                node.children.append(child)
    return root.best_solution()

In [10]:
from search import SearchPolicy

In [11]:
compiler = CompilerService()
client = AnthropicBedrock(aws_profile="dev", aws_region="us-west-2")

In [12]:
policy = SearchPolicy(client, compiler)

In [13]:
jinja_env = jinja2.Environment()
typespec_tpl = jinja_env.from_string(stages.typespec.PROMPT)
drizzle_tpl = jinja_env.from_string(stages.drizzle.PROMPT)

In [14]:
typespec_prompt_params = {"application_description": "Bot that records my gym excercises and suggests workout routines."}
prompt_typespec = typespec_tpl.render(**typespec_prompt_params)
init_typespec = {"role": "user", "content": prompt_typespec}

In [16]:
data_typespec = policy.run_typespec([init_typespec], client, compiler, policy._model)
root_typespec = Node(data_typespec, data_typespec["feedback"]["exit_code"] == 0)

In [17]:
best_typespec = policy.bfs_typespec(init_typespec, root_typespec, max_depth=3, branch_factor=3, max_workers=5)

In [18]:
drizzle_prompt_params = {"typespec_definitions": best_typespec.data["output"]["typespec_definitions"]}
prompt_drizzle = drizzle_tpl.render(**drizzle_prompt_params)
init_drizzle = {"role": "user", "content": prompt_drizzle}

In [19]:
data_drizzle = policy.run_drizzle([init_drizzle], client, compiler, policy._model)
root_drizzle = Node(data_drizzle, not data_drizzle["feedback"]["stderr"])

In [24]:
int(not data_drizzle["feedback"]["stderr"])

1

In [20]:
best_drizzle = policy.bfs_drizzle(init_drizzle, root_drizzle, max_depth=3, branch_factor=3, max_workers=5)

In [21]:
best_drizzle.data

{'message': {'role': 'assistant',
  'content': '<reasoning>\nLooking at the TypeSpec models and interface, we need to design a schema that can:\n1. Store exercises with their common and specific attributes (strength vs cardio)\n2. Store workout routines and their relationship with exercises\n3. Track workout history\n4. Handle arrays (muscle groups, equipment) in PostgreSQL\n5. Use appropriate data types for duration and timestamps\n</reasoning>\n\n<drizzle>\nimport { serial, text, integer, real, timestamp, pgTable, pgEnum } from "drizzle-orm/pg-core";\n\n// Enums\nexport const exerciseTypeEnum = pgEnum("exercise_type", ["strength", "cardio"]);\nexport const intensityEnum = pgEnum("intensity_level", ["low", "medium", "high"]);\nexport const difficultyEnum = pgEnum("difficulty_level", ["beginner", "intermediate", "advanced"]);\nexport const weightUnitEnum = pgEnum("weight_unit", ["lbs", "kg"]);\nexport const distanceUnitEnum = pgEnum("distance_unit", ["km", "miles", "m"]);\n\n// Base ex

In [36]:
root_drizzle = Node(data_drizzle, int(data_drizzle["feedback"]["stderr"] is None))

In [35]:
int(data_drizzle["feedback"]["stderr"] is None)

1

In [33]:
root_drizzle.score

True

In [31]:
data_drizzle

{'message': {'role': 'assistant',
  'content': '<reasoning>\nBased on the TypeSpec models and interface, we need to create tables for:\n1. Exercises - to store individual exercise records\n2. WorkoutRoutines - to store workout routines\n3. TargetMuscleGroups - to store muscle groups for workouts (many-to-many relationship)\n4. A junction table for workout routines and exercises\nWe\'ll use appropriate PostgreSQL data types and create necessary relationships between tables.\n</reasoning>\n\n<drizzle>\nimport { \n  integer, \n  pgTable, \n  text, \n  timestamp, \n  real,\n  interval,\n  serial,\n  primaryKey\n} from "drizzle-orm/pg-core";\n\nexport const exercisesTable = pgTable("exercises", {\n  id: serial("id").primaryKey(),\n  name: text("name").notNull(),\n  sets: integer("sets").notNull(),\n  reps: integer("reps").notNull(),\n  weight: real("weight").notNull(),\n  duration: interval("duration").notNull(),\n  timestamp: timestamp("timestamp").notNull().defaultNow()\n});\n\nexport con

In [22]:
root_typespec

{'message': {'role': 'assistant',
  'content': '<reasoning>\nFor a gym exercise tracking bot, users would send messages like:\n"I did 3 sets of bench press with 150lbs" or "Completed 30 minutes of cardio on treadmill"\nThe LLM can extract exercise details, duration, and other metrics from natural language.\n\nKey functions needed:\n1. Record exercise (with sets, reps, weight if applicable)\n2. Record cardio session (with duration and intensity)\n3. Get workout suggestions based on history\n4. Track personal records\n\nThe interface should handle both strength training and cardio exercises, with appropriate\ndata models for each type. Workout suggestions should consider previous exercises to avoid\novertraining same muscle groups.\n</reasoning>\n\n<typespec>\nmodel Exercise {\n  name: string;\n  type: string; // "strength" or "cardio"\n  muscleGroups: string[];\n  sets?: integer;\n  reps?: integer;\n  weight?: float;\n  duration?: duration;\n  intensity?: string; // "low", "medium", "hi

In [13]:
fake_typespec = r"""
model Exercise {
  name: string;
  type: string; // "strength" or "cardio"
  equipment: string;
  timestamp: Date;
}

model StrengthExercise extends Exercise {
  sets: integer;
  reps: integer;
  weight: float;
  weightUnit: string; // "lbs" or "kg"
}

model CardioExercise extends Exercise {
  duration: duration;
  intensity: string; // "low", "medium", "high"
}

model WorkoutRoutine {
  targetMuscleGroup: string;
  exercises: Exercise[];
  estimatedDuration: duration;
  difficultyLevel: string; // "beginner", "intermediate", "advanced"
}

interface GymBot {
  @llm_func(2)
  recordStrengthExercise(exercise: StrengthExercise): void;

  @llm_func(2)
  recordCardioExercise(exercise: CardioExercise): void;

  @llm_func(3)
  suggestWorkout(muscleGroup: string, difficultyLevel: string): WorkoutRoutine;

  @llm_func(1)
  getExerciseHistory(from: utcDateTime, to: utcDateTime): Exercise[];
}
""".strip()

fake_reasoning = r"""
For a gym exercise tracking bot, I expect users to send messages like:
"I did 3 sets of bench press with 150lbs" or "Completed 30 minutes of cardio on treadmill"
The LLM needs to extract:
- Exercise details (name, type, equipment used)
- Sets, reps, weights for strength training
- Duration for cardio/endurance exercises
- Timestamp (inferred from message time)

For workout suggestions, user might say:
"Suggest a chest workout" or "What should I do for leg day?"
LLM would need to consider:
- Target muscle group
- Available equipment (if specified)
- User's fitness level
- Previous workouts

The interface should support both recording exercises and getting recommendations.
""".strip()

fake_response = f"""
<reasoning>
{fake_reasoning}
</reasoning>

<typespec>
{fake_typespec}
</typespec>
""".strip()

fake_output = stages.typespec.parse_output(fake_response)
fake_schema = "\n".join(['import "./helpers.js";', fake_output["typespec_definitions"]])
fake_feedback = compiler.compile_typespec(fake_schema)

In [14]:
jinja_env = jinja2.Environment()
typespec_tpl = jinja_env.from_string(stages.typespec.PROMPT)
drizzle_tpl = jinja_env.from_string(stages.drizzle.PROMPT)

In [15]:
prompt_params = {"application_description": "Bot that records my gym excercises and suggests workout routines."}
prompt_typespec = typespec_tpl.render(**prompt_params)

In [16]:
init_message = {"role": "user", "content": prompt_typespec}

In [37]:
fake_root = Node(
    {
        "message": {"role": "assistant", "content": fake_response},
        "output": fake_output,
        "feedback": fake_feedback,
    },
    score=0,
)

In [38]:
builder.bfs_typespec(init_message, fake_root, max_depth=3, branch_factor=3, max_workers=5)

<__main__.Node at 0xffffa12f7650>

In [29]:
bfs_typespec(init_message, fake_root, client, compiler, max_depth=3, branch_factor=3, max_workers=5)

<__main__.Node at 0xffffa12f4200>

In [39]:
solution = fake_root.best_solution()

In [40]:
solution.depth

1

## Manage Drizzle

In [42]:
class DrizzleData(TypedDict):
    message: dict
    output: stages.drizzle.DrizzleOutput
    feedback: CompileResult

In [None]:
def run_drizzle(messages: list[dict], client: AnthropicBedrock, compiler: CompilerService) -> DrizzleData:
    response = client.messages.create(
        model=LLM_MODEL,
        max_tokens=8192,
        messages=messages,
    )
    output = stages.drizzle.parse_output(response.content[0].text)
    feedback = compiler.compile_drizzle(output["drizzle_schema"])
    return {
        "message": {"role": "assistant", "content": response.content[0].text},
        "output": output,
        "feedback": feedback,
    }


def bfs_drizzle(
    init_message: dict,
    root: Node[DrizzleData],
    client: AnthropicBedrock,
    compiler: CompilerService,
    max_depth: int = 3,
    branch_factor: int = 3,
    max_workers: int = 5,
) -> Node[DrizzleData]:
    while True:
        if root.best_solution().score == 1:
            break
        
        candidates: list[Node[TypespecData]] = []
        for node in [root] + root._get_all_children():
            if (
                node.is_terminal
                and node.depth < max_depth
                and node.data["feedback"]["exit_code"] != 0
            ):
                candidates.append(node)
        #candidates = [root] + root._get_all_children()
        #candidates = [node for node in candidates if node.is_terminal and node.depth < max_depth]
        if not candidates:
            break
        # expand each terminal node with branch_factor with concurrent thread pool
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_node: dict[concurrent.futures.Future, Node[TypespecData]] = {}
            for node in candidates:
                messages = [init_message]
                for n in node.get_trajectory():
                    messages.append(n.data["message"])
                    messages.append({"role": "user", "content": n.data["feedback"]["stdout"]})
                for _ in range(branch_factor):
                    future_to_node[executor.submit(run_typespec, messages, client, compiler)] = node
            for future in concurrent.futures.as_completed(future_to_node):
                node = future_to_node[future]
                result = future.result()
                score = 1 if result["feedback"]["exit_code"] == 0 else 0
                child = Node(result, score, parent=node)
                node.children.append(child)
    return root.best_solution()

In [47]:
new_node = run_typespec(messages, client, compiler)

In [53]:
print(new_node['output']['reasoning'])

For a gym exercise tracking bot, I expect users to send messages like:
"I did 3 sets of bench press with 150lbs" or "Completed 30 minutes of cardio on treadmill"
The LLM needs to extract:
- Exercise details (name, type, equipment used)
- Sets, reps, weights for strength training
- Duration for cardio/endurance exercises
- Timestamp (inferred from message time)

For workout suggestions, user might say:
"Suggest a chest workout" or "What should I do for leg day?"
LLM would need to consider:
- Target muscle group
- Available equipment (if specified)
- User's fitness level
- Previous workouts

The interface should support both recording exercises and getting recommendations.


In [None]:
typespec_schema = """
extern dec llm_func(target: unknown, history: valueof int32);

//{{typespec_definitions}}
""".strip()

In [15]:
drizzle_schema = """
import { integer, pgTable, text } from "drizzle-orm/pg-core";

export const gymEquipmentTable = pgTable("gym_equipment", {
    id: integer().primaryKey().generatedAlwaysAsIdentity(),
    name: text(),
    description: text(),
    price: integer(),
    stock: integer(),
});
""".strip()

In [16]:
result = compiler.compile_drizzle(drizzle_schema)

running local


In [17]:
result

{'exit_code': 0,
 'stdout': "No config path provided, using default 'drizzle.config.ts'\nReading config file '/app/drizzle.config.ts'\nUsing 'pg' driver for database querying\n[⣷] Pulling schema from database...\n\x1b[2K\x1b[1G[✓] Pulling schema from database...\n[✓] Changes applied\n",
 'stderr': None}

In [None]:
def bfs_apply(
    branching_factor: int = 5,
    max_depth: int = 3,
):
    pass

In [None]:
simple_tree = Node(0, 0)
simple_tree.children = [Node(1, 1, simple_tree), Node(2, 2, simple_tree)]

for node in simple_tree._get_all_children():
    print(node.data)