# Automatic Prompt Engineering for classification

Given (text -> label), this notebook generates and optimizes system and user prompts.

This is how the text will be labelled
- (system prompt, user prompt prefix + text + user prompt suffix) -Haiku-> bot response -function-> label
- The function will be defined by you (it could be just a string match)

The notebook will produce
- the system prompt
- the user prompt prefix
- the user prompt suffix

To use this tool, you will need
- an Anthropic API key
- a dataset (text -> label)
- define the function bot_response -> label
- describe the expected bot_response that Haiku should produce

This is how prompt tuning is done
- Sample from the full dataset.
- Haiku takes in (system prompt, user prompt prefix + text + user prompt suffix) and produces bot_response.
- The function takes in bot_response and produces the label. The (text -> label) process is the forward pass.
- Sample from the mistakes.
- Opus takes in the mistakes and summarizes the mistakes (this is the gradient).
- Opus takes in the gradient (gradient) and the current prompts (model parameters) updates the prompts.
- Repeat.

You will need to have these Python modules installed
- pandas
- anthropic

In [1]:
import os
import re
import random
import textwrap
import collections
import itertools
import concurrent.futures
import pandas as pd
from IPython.display import display, HTML
from sklearn.metrics import precision_score, recall_score

import anthropic

# Use your Anthropic API key here

In [2]:
anthropic_api_key = os.environ.get("ANTHROPIC_API_KEY")
# anthropic_api_key = "sk-ant-"
client = anthropic.Anthropic(api_key=anthropic_api_key)

In [3]:
print(anthropic_api_key[:16])

sk-ant-api03-Mbs


In [4]:
NUM_PARALLEL_FORWARD_PASS_API_CALLS = 100  # see https://docs.anthropic.com/claude/reference/rate-limits
NUM_SAMPLES_FORWARD_PASS_FOR_EACH_LABEL = 100
NUM_SAMPLES_MISTAKE_GRADIENT_CALCULATION_FOR_EACH_LABEL = 10
NUM_SAMPLES_CORRECT_GRADIENT_CALCULATION_FOR_EACH_LABEL = 5
NUM_ITERATIONS = 5

# Define the dataset here
You will need to edit this if your task is different.

In [5]:
# from https://www.kaggle.com/c/quora-insincere-questions-classification/data
df = pd.read_csv("qiqc_truncated.csv")
df["target"].value_counts()

0    1000
1    1000
Name: target, dtype: int64

In [6]:
df = pd.concat([
    df[df["target"] == 1].sample(100),
    df[df["target"] == 0].sample(100),
], ignore_index=True).sample(frac=1)

# you can also just define the dataset with code
dataset = list(zip(df["question_text"], df["target"].map({0: "sincere", 1: "insincere"})))

In [7]:
# make sure the number of types of labels is small
# prefer descriptive labels
collections.Counter(label for _, label in dataset)

Counter({'insincere': 100, 'sincere': 100})

In [8]:
dataset[0]  # should be tuple[string, label]

('Is it generally acceptable if someone grew up in the USA and considers Mediterranean Europeans like French or Spaniards not white, but rather black, because they are not Germanic like Dutch or Scandinavian people?',
 'insincere')

# Define your task here
You will need to edit this if your task is different.

In [9]:
def predict_from_final_layer(final_layer_value):
    if "In conclusion, this question is sincere." in final_layer_value:
        return "sincere"
    return "insincere"  # note that this is the default predicted label if things go wrong

In [10]:
PROMPT_UPDATE_SYSTEM_PROMPT = """
You will write a set of prompts for an LLM to classify where a question is insincere.

The LLM will take the following input
- system_prompt
- user_prompt_prefix + question + user

The LLM is expected to produce the following output
- reasoning on whether the question is insincere
- with an ending "In conclusion, this question is insincere." or "In conclusion, this question is sincere."

A function will take the LLM output and check for the exact string "In conclusion, this question is sincere".
If the string appears in the LLM output, the question will be predicted to be insincere.

Please remember include the instruction to produce the exact string at the end of the LLM output.
""".strip()

In [11]:
# usually Opus is good enough to produce working prompts
model_parameters = {
    "system_prompt": "",
    "user_prompt_prefix": "",
    "user_prompt_suffix": "",
}

# Model configuration

This should be general enough for classification tasks with a small number of classes.

In [12]:
def get_samples_and_labels(dataset):
    dataset = [data for data in dataset]
    random.shuffle(dataset)
    label_set = set(label for _,label in dataset)

    sampled_dataset = []
    for target_label in label_set:
        dataset_with_label = [(data, label) for data, label in dataset if label == target_label]
        sampled_dataset += dataset_with_label[:NUM_SAMPLES_FORWARD_PASS_FOR_EACH_LABEL]
    random.shuffle(sampled_dataset)

    return [data for data, _ in sampled_dataset], [label for _, label in sampled_dataset]

In [13]:
def compute_final_layer(sample, model_parameters):
    
    user_message = model_parameters["user_prompt_prefix"] + sample + model_parameters["user_prompt_suffix"]
    
    message = client.messages.create(
        model="claude-3-haiku-20240307",
        max_tokens=2000,
        temperature=0,
        system=model_parameters["system_prompt"],
        messages=[{"role": "user", "content": [{"type": "text", "text": user_message}]}],
        timeout=10
    )

    return message.content[0].text

In [14]:
def forward_pass(samples, model_parameters):
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PARALLEL_FORWARD_PASS_API_CALLS) as executor:
        final_layer_values = executor.map(compute_final_layer, samples, [model_parameters]*len(samples))

    final_layer_values = list(final_layer_values)

    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PARALLEL_FORWARD_PASS_API_CALLS) as executor:
        predicted_labels = executor.map(predict_from_final_layer, final_layer_values)

    predicted_labels = list(predicted_labels)

    return final_layer_values, predicted_labels

In [15]:
def calculate_gradient(samples, final_layer_values, predicted_labels, actual_labels, metrics):

    system_message = "You will provide a concise summary of the mistakes in the classification."
    
    mistake_counts = collections.defaultdict(int)
    correct_counts = collections.defaultdict(int)

    user_message = textwrap.dedent(
        f"""
        Please summarize the mistakes in the classification where predicted_label != actual_label
        
        The current metrics is {str(metrics)}
        """
    )
    
    for sample, final_layer_value, predicted_label, actual_label in zip(
        samples, final_layer_values, predicted_labels, actual_labels
    ):
        if predicted_label == actual_label:
            if correct_counts[actual_label] > NUM_SAMPLES_CORRECT_GRADIENT_CALCULATION_FOR_EACH_LABEL:
                continue
            correct_counts[actual_label] += 1
        else:
            if mistake_counts[actual_label] > NUM_SAMPLES_MISTAKE_GRADIENT_CALCULATION_FOR_EACH_LABEL:
                continue
            mistake_counts[actual_label] += 1
        
        user_message += textwrap.dedent(
            f"""
            <sample>{sample}<\sample>
            
            <final_layer_value>{final_layer_value}<\final_layer_value>

            <predicted_label>{predicted_label}<\predicted_label>
            
            <actual_label>{actual_label}<\actual_label>
            """
        )
    
    message = client.messages.create(
        model="claude-3-opus-20240229",
        max_tokens=2000,
        temperature=0,
        system=system_message,
        messages=[{"role": "user", "content": [{"type": "text", "text": user_message}]}]
    )
    
    return message.content[0].text

In [16]:
def update_model_parameters(gradient, model_parameters, metrics):

    system_message = PROMPT_UPDATE_SYSTEM_PROMPT

    user_message = textwrap.dedent(f"""    
    The current metrics is {str(metrics)}

    This the current set of prompts
    <system_prompt>
    {model_parameters['system_prompt']}
    </system_prompt>

    <user_prompt_prefix>
    {model_parameters['user_prompt_prefix']}
    </user_prompt_prefix>

    <user_prompt_suffix>
    {model_parameters['user_prompt_suffix']}
    </user_prompt_suffix>

    This is the feedback on the prompt
    <feedback>
    {gradient}
    </feedback>

    Please reply in the following format
    
    <system_prompt>
    (the new system prompt here)
    </system_prompt>

    <user_prompt_prefix>
    (the new user prompt prefix here)
    </user_prompt_prefix>

    <user_prompt_suffix>
    (the new user prompt suffix here)
    </user_prompt_suffix>
    """)
    
    message = client.messages.create(
        model="claude-3-opus-20240229",
        max_tokens=2000,
        temperature=0,
        system=system_message,
        messages=[{"role": "user", "content": [{"type": "text", "text": user_message}]}],
    )
    
    bot_message = message.content[0].text

    match_system_prompt = re.search(r'<system_prompt>(.*?)</system_prompt>', bot_message, re.DOTALL)
    match_user_prompt_prefix = re.search(r'<user_prompt_prefix>(.*?)</user_prompt_prefix>', bot_message, re.DOTALL)
    match_user_prompt_suffix = re.search(r'<user_prompt_suffix>(.*?)</user_prompt_suffix>', bot_message, re.DOTALL)    
    
    model_parameters = {
        "system_prompt": match_system_prompt.group(1) if match_system_prompt else "",
        "user_prompt_prefix": match_user_prompt_prefix.group(1) if match_user_prompt_prefix else "",
        "user_prompt_suffix": match_user_prompt_suffix.group(1) if match_user_prompt_suffix else "",
    }
    
    return model_parameters

# Display functions

In [17]:
def calculate_metrics(predicted_labels, actual_labels):
    metrics = {}
    for label in set(actual_labels):
        metrics[f"{label}_precision"] = precision_score(
            [actual_label == label for actual_label in actual_labels],
            [predicted_label == label for predicted_label in predicted_labels],
            zero_division = 0,
        )
        metrics[f"{label}_recall"] = recall_score(
            [actual_label == label for actual_label in actual_labels],
            [predicted_label == label for predicted_label in predicted_labels],
        )        
    return metrics

In [18]:
def save_and_display_prompt_history(model_parameters_history, gradient_history, metrics_history):

    iteration_data_all = []

    for model_parameter, gradient, metrics in itertools.zip_longest(
        model_parameters_history, gradient_history, metrics_history, fillvalue={}
    ):
        iteration_data = {}
        for k,v in model_parameter.items():
            iteration_data[k] = v
        for k,v in metrics.items():
            iteration_data[k] = v
        if gradient:
            iteration_data["gradient"] = gradient
        iteration_data_all.append(iteration_data)

    df = pd.DataFrame(iteration_data_all).fillna("")

    os.makedirs("html_output", exist_ok=True)
    prompt_info_file_name = "html_output/prompt-history-classification.html"
    with open(prompt_info_file_name, 'w') as f:
        f.write(
            df.replace(
                {r'\n': '<br>'}, regex=True
            ).style.set_table_styles(
                [
                    dict(selector="tr:nth-child(even)", props=[("background-color", "#f2f2f2")]),
                    dict(selector="tr:nth-child(odd)", props=[("background-color", "white")]),
                ]
            ).render(
                index=False, escape=False
            )
        )

    link = f'<a href="{prompt_info_file_name}" target="_blank">{prompt_info_file_name}</a>'
    display(HTML(link))
    
    
def save_and_display_current_iteration(iteration_idx, samples, final_layer_values, predicted_labels, actual_labels):
    
    df = pd.DataFrame({
        "sample": samples,
        "final_layer_value": final_layer_values,
        "predicted_label": predicted_labels,
        "actual_label": actual_labels,
    })
    
    def highlight_diff(row):
        if row['predicted_label'] == row['actual_label']:
            return ['background-color: #90EE90'] * len(row)  # green
        return ['background-color: #FFB6C1'] * len(row)  # red
    
    os.makedirs("html_output", exist_ok=True)
    iteration_info_file_name = f"html_output/iteration-classification-{iteration_idx:03}.html"
    with open(iteration_info_file_name, 'w') as f:
        f.write(
            df.replace(
                {r'\n': '<br>'}, regex=True
            ).style.apply(highlight_diff, axis=1).render(
                index=False, escape=False
            )
        )
    
    link = f'<a href="{iteration_info_file_name}" target="_blank">{iteration_info_file_name}</a>'
    display(HTML(link))
    
    os.makedirs("html_output", exist_ok=True)
    iteration_info_file_name = f"html_output/iteration-classification-{iteration_idx:03}-diff.html"
    with open(iteration_info_file_name, 'w') as f:
        f.write(
            df[df["predicted_label"] != df["actual_label"]].sort_values("actual_label").replace(
                {r'\n': '<br>'}, regex=True
            ).style.set_table_styles(
                [
                    dict(selector="tr:nth-child(even)", props=[("background-color", "#f2f2f2")]),
                    dict(selector="tr:nth-child(odd)", props=[("background-color", "white")]),                    
                ]
            ).render(
                index=False, escape=False
            )
        )

    link = f'<a href="{iteration_info_file_name}" target="_blank">{iteration_info_file_name}</a>'
    display(HTML(link))

# Execution

In [19]:
model_parameters_history = [{k:v for k,v in model_parameters.items()}]
gradient_history = []
metrics_history = []

for iteration_idx in range(NUM_ITERATIONS):
    samples, actual_labels = get_samples_and_labels(dataset)

    final_layer_values, predicted_labels = forward_pass(samples, model_parameters)
    metrics = calculate_metrics(predicted_labels, actual_labels)
    gradient = calculate_gradient(samples, final_layer_values, predicted_labels, actual_labels, metrics)
    model_parameters = update_model_parameters(gradient, model_parameters, metrics)

    metrics_history.append(metrics)
    gradient_history.append(gradient)
    model_parameters_history.append({k:v for k,v in model_parameters.items()})

    save_and_display_prompt_history(model_parameters_history, gradient_history, metrics_history)    
    save_and_display_current_iteration(iteration_idx, samples, final_layer_values, predicted_labels, actual_labels)