In [None]:
from typing import TypedDict, List, Dict, Any, Literal, Callable, NamedTuple
import json
from pydantic import BaseModel, Field
from prompt_engineer import *
import pandas as pd
from tqdm import notebook
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
import os


## Experiment 1: IMDB Sentiment Analysis with Prompt Optimization

### Creating a dataset

In [None]:
class InitialPromptOutputFormat(BaseModel):
    """Output format for the initial prompt."""
    sentiment: bool = Field(..., description="sentiment analysis of the text either positive/True or negarive/False")
    other: str = Field(..., description="Everything else requested by the prompt in string format.")

initial_prompt=PromptTemplateData(
    prompt="For the following given text return true if the sentiment is positive, otherwise return false. \nText: {text}",
    system_prompt="You are an expert in extracting the comment sentiment from the given text.",
    output_format=InitialPromptOutputFormat,
    prompt_format_function=text_format_function
)

In [None]:
train = pd.read_csv("Train.csv")

if not os.path.exists("bad_data.csv"):
    # Function to process a single row for multiprocessing
    def process_single_row(row_data):
        """Process a single row for LLM call - needs to be at module level for pickling"""
        index, row = row_data
        
        # Create the prompt template for this specific row
        prompt = PromptTemplateData(
            prompt="For the following given text return true if the sentiment is positive, otherwise return false. \nText: {text}",
            system_prompt="You are an expert in extracting the comment sentiment from the given text.",
            output_format=InitialPromptOutputFormat,
            prompt_format_function=lambda x, y: x.format(text=y)
        )
        
        # Format the prompt with the text
        formatted_prompt = prompt(row['text'])
        
        # Make the LLM call
        llm_response = llm_call(
            prompt=formatted_prompt,
            model="gpt-4o-mini"
        )
        return index, row, llm_response

    # creating a dataset where gpt-4o-mini performs poorly
    bad_data = {'text': [], 'label': []}

    # Prepare data for multiprocessing
    rows_to_process = [(index, row) for index, row in train.iterrows()]

    # Use multiprocessing with progress tracking
    max_workers = min(mp.cpu_count(), 8)  # Adjust based on API rate limits
    batch_size = 50  # Process in batches to avoid overwhelming the API

    print(f"Using {max_workers} workers for parallel processing...")

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit batches of work
        futures = []
        for i in range(0, len(rows_to_process), batch_size):
            batch = rows_to_process[i:i + batch_size]
            batch_futures = [executor.submit(process_single_row, row_data) for row_data in batch]
            futures.extend(batch_futures)
            
            # Process completed futures from this batch
            for future in as_completed(batch_futures):
                try:
                    index, row, llm_response = future.result()
                    if llm_response.sentiment != (row['label'] == 1):
                        bad_data['text'].append(row['text'])
                        bad_data['label'].append(row['label'])
                    
                    # Break early if we have enough bad examples
                    if len(bad_data['text']) >= 100:
                        # Cancel remaining futures
                        for f in futures:
                            f.cancel()
                        break
                        
                except Exception as e:
                    print(f"Error processing row {index}: {e}")
            
            # Break if we have enough data
            if len(bad_data['text']) >= 100:
                break
            
            print(f"Processed batch {i//batch_size + 1}, found {len(bad_data['text'])} bad examples so far...")

    training_dataset = pd.DataFrame(bad_data)
    print(f"Created training dataset with {len(training_dataset)} examples")
    training_dataset.to_csv("bad_data.csv", index=False)
else:
    training_dataset = pd.read_csv("bad_data.csv")
    print(f"Loaded existing training dataset with {len(training_dataset)} examples")

### Evaluation Method

In [None]:

def process_single_evaluation(args):
    """Process a single row for evaluation - needs to be at module level for pickling"""
    prompt, row_data, model = args
    index, row = row_data
    
    formatted_prompt = prompt(row['text'])
    llm_response = llm_call(formatted_prompt,temperature=0, model=model)
    prediction = 1 if llm_response.sentiment else 0
    
    return index, prediction, row['label'], row['text']

def evaluation_method(prompt: PromptTemplateData, dataset: pd.DataFrame) -> Dict[str, float]:
    def convert_to_sentiment(x):
        return "positive" if x == 1 else "negative"
    # Prepare data for multiprocessing
    rows_to_process = [(index, row) for index, row in dataset.iterrows()]
    
    # Create arguments for each process
    process_args = [(prompt, row_data, "gpt-4o-mini") for row_data in rows_to_process]
    
    # Use multiprocessing with progress tracking
    max_workers = min(mp.cpu_count(), 8)  # Adjust based on API rate limits
    
    predictions = []
    actual = []
    failure_cases = []
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        futures = [executor.submit(process_single_evaluation, args) for args in process_args]
        
        # Collect results with progress bar
        for future in as_completed(futures):
            try:
                index, prediction, label, text = future.result()
                predictions.append(prediction)
                actual.append(label)
                
                # Check for failure cases
                if prediction != label:
                    failure_cases.append(f"text: {text}| predicted: {convert_to_sentiment(prediction)}| actual: {convert_to_sentiment(label)}")
                    
            except Exception as e:
                print(f"Error processing row: {e}")
                # Handle failed cases by adding default values
                predictions.append(0)
                actual.append(0)
    
    # Calculate metrics
    precision = sum(p == a == 1 for p, a in zip(predictions, actual)) / sum(p == 1 for p in predictions) if sum(p == 1 for p in predictions) > 0 else 0
    recall = sum(p == a == 1 for p, a in zip(predictions, actual)) / sum(a == 1 for a in actual) if sum(a == 1 for a in actual) > 0 else 0
    
    return {
        "accuracy": sum(p == a for p, a in zip(predictions, actual)) / len(actual),
        "recall": recall,
        "precision": precision,
        "f1_score": 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0,
        "false_positive_rate": sum(p == 1 and a == 0 for p, a in zip(predictions, actual)) / sum(a == 0 for a in actual) if sum(a == 0 for a in actual) > 0 else 0,
        "failure_cases": failure_cases
    }

In [None]:
MAX_ITERATIONS = 100
TARGET_SCORE = 0.99
EPSILON = 0.5  # exploration rate
DROP_OUT = 0.5 # dropout rate for improvement actions
for EPSILON in notebook.tqdm([0.1, 0.3, 0.5]):
    for DROP_OUT in [0.1, 0.3, 0.5]:
        print(f"Running with EPSILON={EPSILON}, DROP_OUT={DROP_OUT}, MAX_ITERATIONS={MAX_ITERATIONS}")
        po = PromptOptimizer(
            evaluation_method=evaluation_method,     # evaluation method for demonstration
            training_dataset=training_dataset,
            action_selection=action_selection_prompt,
            action_application=action_application,
            initial_prompt=initial_prompt,
            action_list=IMPROVEMENT_ACTIONS,
            train_test_ratio=0.5,
            max_iterations=MAX_ITERATIONS,
            target_score=TARGET_SCORE,
            epsilon=EPSILON,
            drop_out=DROP_OUT,
        )
        final_state = po.run()
        save_final_state(final_state, EPSILON, DROP_OUT, MAX_ITERATIONS)

#po.visualize()

In [None]:
plot_validation_f1_scores()

In [None]:
plot_cumulative_max_f1_scores()