# Import the Zenbase Library

In [0]:
import sys
import subprocess

def install_package(package):
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    except subprocess.CalledProcessError as e:
        print(f"Failed to install {package}: {e}")
        raise

def install_packages(packages):
    for package in packages:
        install_package(package)

try:
    # Check if running in Google Colab
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    # Install the zenbase package if running in Google Colab
    # install_package('zenbase')
    # Install the zenbse package from a GitHub branch if running in Google Colab
    install_package('git+https://github.com/zenbase-ai/lib.git@main#egg=zenbase&subdirectory=py')

    # List of other packages to install in Google Colab
    additional_packages = [
        'python-dotenv',
        'langsmith[vcr]',
        'openai',
        'langchain',
        'langchain_openai'
    ]
    
    # Install additional packages
    install_packages(additional_packages)

# Now import the zenbase library
try:
    import zenbase
except ImportError as e:
    print("Failed to import zenbase: ", e)
    raise

# Configure the Environment

In [0]:
from pathlib import Path
from dotenv import load_dotenv

# import os
#
# os.environ["OPENAI_API_KEY"] = "..."
# os.environ["LANGCHAIN_API_KEY"] = "..."
# os.environ["LANGCHAIN_TRACING_V2"] = "true"

load_dotenv(Path("../../.env.test"), override=True)

In [0]:
import nest_asyncio

nest_asyncio.apply()

# Initial Setup

In [0]:
from langsmith.wrappers import wrap_openai
from openai import OpenAI

openai = wrap_openai(OpenAI())

# Now, you probably already have some LLM code.

It could use the OpenAI SDK, LangChain, or anything really. But it looks something like this:

In [0]:
import json
from langsmith import traceable
from langsmith.schemas import Run, Example
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

@traceable
def solver(inputs):
    messages = [
        ("system", """You are an expert math solver. Solve the given problem using the provided plan and operations.
        Return only the final numerical answer, without any additional text or explanation."""),
        ("user", "Question: {question}"),
        ("user", "Plan: {plan}"),
        ("user", "Mathematical Operation: {operation}"),
        ("user", "Provide the final numerical answer:")
    ]

    chain = ChatPromptTemplate.from_messages(messages) | ChatOpenAI(model="gpt-3.5-turbo") | StrOutputParser()

    plan = planner_chain(inputs)
    operation = operation_finder({"plan": plan["plan"], "question": inputs["question"]})
    
    inputs_to_answer = {
        "question": inputs["question"],
        "plan": plan["plan"],
        "operation": operation["operation"],
    }
    answer = chain.invoke(inputs_to_answer)
    return {"answer": answer}

@traceable
def planner_chain(inputs):
    messages = [
        ("system", """You are an expert math solver. Create a step-by-step plan to solve the given problem.
        Be clear and concise in your steps."""),
        ("user", "Problem: {question}\n\nProvide a step-by-step plan to solve this problem:")
    ]

    chain = ChatPromptTemplate.from_messages(messages) | ChatOpenAI(model="gpt-3.5-turbo") | StrOutputParser()
    plan = chain.invoke(inputs)
    return {"plan": plan}

@traceable
def operation_finder(inputs):
    messages = [
        ("system", """You are an expert math solver. Identify the overall mathematical operation needed to solve the problem 
        based on the given plan. Use simple operations like addition, subtraction, multiplication, and division."""),
        ("user", "Question: {question}"),
        ("user", "Plan: {plan}"),
        ("user", "Identify the primary mathematical operation needed:")
    ]

    chain = ChatPromptTemplate.from_messages(messages) | ChatOpenAI(model="gpt-3.5-turbo") | StrOutputParser()
    operation = chain.invoke(inputs)
    return {"operation": operation}

## And let's say you have an eval function like this

In [0]:
def score_answer(run: Run, example: Example):
    output = run.outputs["answer"].split("#### ")[-1]
    target = example.outputs["answer"].split("#### ")[-1]
    return {
        "key": "correctness",
        "score": int(output == target),
    }

## Then you're probably evaluating like this

In [0]:
# Evaluate using LangSmith
from langsmith import Client, evaluate

langsmith = Client()
evalset = list(langsmith.list_examples(dataset_name="GSM8K_test_set_langsmith_dataset_2j24kEFx8T718mqwRblcoNK3S0L"))

evaluate_kwargs = dict(
    data=evalset,
    evaluators=[score_answer],
    client=langsmith,
    max_concurrency=2,
)

evaluate(solver, **evaluate_kwargs)

 # Now, how can we optimize this score?

## First, initialize the Zenbase ZenbaseTracer

In [0]:
from zenbase.core.managers import ZenbaseTracer
zenbase_tracer = ZenbaseTracer()


## Hook up Zenbase to your functions

1. Use the `zenbase_tracer` decorator.
2. Change function inputs to request
3. Use request's `zenbase.task_demos` to get the few-shot examples for the task and add them however you would like into your prompt.
4. If you need to use just a few examples, you can use `request.zenbase.task_demos[:2]` to get the first two examples.

In [0]:
from zenbase.types import LMRequest


@zenbase_tracer  # it is 1
@traceable
def solver(request: LMRequest):  # it is 2
    messages = [
        ("system", """You are an expert math solver. Solve the given problem using the provided plan and operations.
        Return only the final numerical answer, without any additional text or explanation."""),
    ]

    for demo in request.zenbase.task_demos: # it is 3
        messages += [
            ("user", f'Example Question: {demo.inputs["question"]}'),
            ("assistant", f'Example Answer: {demo.outputs["answer"]}'),
        ] # it is 4
    
    messages.extend([
        ("user", "Question: {question}"),
        ("user", "Plan: {plan}"),
        ("user", "Mathematical Operation: {operation}"),
        ("user", "Provide the final numerical answer:")
    ])

    chain = ChatPromptTemplate.from_messages(messages) | ChatOpenAI(model="gpt-3.5-turbo") | StrOutputParser()

    plan = planner_chain(request.inputs)
    operation = operation_finder({"plan": plan["plan"], "question": request.inputs["question"]})

    inputs_to_answer = {
        "question": request.inputs["question"],
        "plan": plan["plan"],
        "operation": operation["operation"],
    }
    answer = chain.invoke(inputs_to_answer)
    return {"answer": answer}


@zenbase_tracer  # it is 1
@traceable
def planner_chain(request: LMRequest):  # it is 2
    messages = [
        ("system", """You are an expert math solver. Create a step-by-step plan to solve the given problem.
        Be clear and concise in your steps."""),
        ("user", "Problem: {question}\n\nProvide a step-by-step plan to solve this problem:")
    ]

    if request.zenbase.task_demos:  # it is 3
        for demo in request.zenbase.task_demos[:2]:  # it is 4
            messages += [
                ("user", demo.inputs["question"]),
                ("assistant", demo.outputs["plan"]),
            ]

    chain = ChatPromptTemplate.from_messages(messages) | ChatOpenAI(model="gpt-3.5-turbo") | StrOutputParser()
    plan = chain.invoke(request.inputs)
    return {"plan": plan}


@zenbase_tracer  # it is 1
@traceable
def operation_finder(request: LMRequest):  # it is 2
    messages = [
        ("system", """You are an expert math solver. Identify the overall mathematical operation needed to solve the problem 
        based on the given plan. Use simple operations like addition, subtraction, multiplication, and division."""),
        ("user", "Question: {question}"),
        ("user", "Plan: {plan}"),
        ("user", "Identify the primary mathematical operation needed:")
    ]

    if request.zenbase.task_demos:  # it is 3 
        for demo in request.zenbase.task_demos[:2]:  # it is 4
            messages += [
                ("user", demo.inputs["question"]),
                ("user", demo.inputs["plan"]),
                ("assistant", demo.outputs["operation"]),
            ]

    chain = ChatPromptTemplate.from_messages(messages) | ChatOpenAI(model="gpt-3.5-turbo") | StrOutputParser()
    operation = chain.invoke(request.inputs)
    return {"operation": operation}

## Now we can optimize!

### Set up your optimizer:

In [0]:
from zenbase.adaptors.langchain import ZenLangSmith
from zenbase.optim.metric.bootstrap_few_shot import BootstrapFewShot

# Define your Langsmith and helper
langsmith = Client()
zen_langsmith_adaptor = ZenLangSmith(client=langsmith)

TRAIN_SET = "GSM8K_train_set_langsmith_dataset_2j24jtX6pr5OFyi9IRlj2Pk29NX"
TEST_SET = "GSM8K_test_set_langsmith_dataset_2j24kEFx8T718mqwRblcoNK3S0L"
VALIDATION_SET = "GSM8K_validation_set_langsmith_dataset_2j24kCxc6WhYDe27hEcnhUEHPmE"
SHOTS = 2
SAMPLES = 2

train_set = zen_langsmith_adaptor.fetch_dataset(dataset_name=TRAIN_SET)
test_set = zen_langsmith_adaptor.fetch_dataset(dataset_name=TEST_SET)
validation_set = zen_langsmith_adaptor.fetch_dataset(dataset_name=VALIDATION_SET)

evaluator_kwargs = dict(
    evaluators=[score_answer],
    client=langsmith,
    max_concurrency=1,
)

bootstrap_few_shot = BootstrapFewShot(
    shots=SHOTS,
    training_set=train_set,
    test_set=test_set,
    validation_set=validation_set,
    evaluator_kwargs=evaluator_kwargs,
    zen_adaptor=zen_langsmith_adaptor,
)




### Do the optimization

In [0]:
best_fn, candidates = bootstrap_few_shot.perform(
    solver,
    samples=SAMPLES,
    rounds=1,
    trace_manager=zenbase_tracer,
)

### Introspect evaluation improvement

You can see in this example that the best function has improved the evaluation score by 50%.

In [0]:
bootstrap_few_shot.base_evaluation.evals

In [0]:
bootstrap_few_shot.best_evaluation.evals

### Use your optimized function

In [None]:
zenbase_tracer.flush()

In [0]:
# Now you can use your zenbase fn
best_fn({"question": "If I have 30% of shares, and Mo has 24.5% of shares, how many of our 10M shares are unassigned?"})

### Introspect function traces

In [0]:
function_traces = [v for k, v in zenbase_tracer.all_traces.items()][0]["optimized"]

### Check the optimized parameters for solver


In [0]:
from pprint import pprint

pprint(function_traces["solver"]["args"]["request"].zenbase.task_demos)


### Check the optimized parameters for operation_finder


In [0]:
from pprint import pprint

pprint(function_traces["operation_finder"]["args"]["request"].zenbase.task_demos)

### Check the optimized parameters for planner_chain


In [0]:
from pprint import pprint

pprint(function_traces["planner_chain"]["args"]["request"].zenbase.task_demos)

## How to save the function and load it later

### Save the optimized function args to a file

In [0]:
bootstrap_few_shot.save_optimizer_args("bootstrap_few_shot_args.zenbase")

### Load the optimized function args with the function

In [0]:
optimized_function = bootstrap_few_shot.load_optimizer_and_function("bootstrap_few_shot_args.zenbase", solver, zenbase_tracer)

### Use the loaded function and make sure it loaded the demos.