In [None]:
! pip install langchain langchain_huggingface torch transformers langchain_groq

In [None]:
unfairness_categories = ['A', 'CH', 'CR', 'J', 'LAW', 'LTD', 'TER', 'USE']

In [None]:
train_doc_ids = pd.read_csv('dataset/claudette_train_merged.tsv', sep='\t')['document'].unique()
val_doc_ids = pd.read_csv('dataset/claudette_val_merged.tsv', sep='\t')['document'].unique()
test_doc_ids = pd.read_csv('dataset/claudette_test_merged.tsv', sep='\t')['document'].unique()

df = pd.read_csv('dataset/claudette_all_merged.tsv', sep='\t').rename(columns={"file_name": "document"})
df_train = df.loc[df['document'].isin(train_doc_ids)]
df_train_neg = df_train.loc[df_train['label'] == 0]
df_train_pos = df_train.loc[df_train['label'] == 1]
df_val = df.loc[df['document'].isin(val_doc_ids)]
df_test = df.loc[df['document'].isin(test_doc_ids)]

In [None]:
df_train_pos_per_cat = {}
for category in unfairness_categories:
    df_train.loc[:, category] = df_train.loc[:, 'label_type'].apply(lambda cats: int(category.upper() in cats))
    df_train_pos.loc[:, category] = df_train_pos.loc[:, 'label_type'].apply(lambda cats: int(category.upper() in cats))
    df_train_pos_per_cat[category] = df_train.loc[df_train[category] == 1]

In [None]:
def make_answer_instruction(n_words = 50):
    return f'Start your answer with "yes" or "no" and then justify your response in no more than {n_words} words.' 

In [None]:
def sample_example(filtered_set, cat):
    
    example = filtered_set['text'].sample(n=1).iloc[0]

    return example

In [None]:
def add_positive_example(part_of_the_prompt: str, cat: str) -> str:
    
    """
    Add positive example to the prompt.
    Maintains a list of added examples for possible removal.

    Args:
        part_of_the_prompt (str): The part of the prompt to which the example will be added or from which it will be removed.
        cat (str): The category column name in the test set.
        example_type (str): The type of example to find ('positive' or 'negative').
        mode (str): Mode of operation - 'add' to add an example, 'remove' to remove an example.

    Returns:
        str: The updated part of the prompt with example clauses
    """
    
    global added_examples
    
    example_type = "positive"
    mode = "add"
    
    
    if mode == "add":
        if example_type == "positive":
            filtered_set = df_train[df_train[cat] == 1]
        elif example_type == "negative":
            filtered_set = df_train[df_train[cat] == 0]

        if filtered_set.empty:
            return "No positive clause found of the category in data set."
        
        sampled_example = sample_example(filtered_set, cat)
            
        added_examples[example_type].append(sampled_example)
        
        
        examples_text = ""
        for example_type, examples in added_examples.items():
            for example in examples:
                examples_text += f"For example, consider this clause of the same category: \"{example}\""
        
        return examples_text + ". " + part_of_the_prompt

In [None]:
def add_negative_example(part_of_the_prompt: str, cat: str) -> str:
    
    """
    Add negative example to the prompt.
    Maintains a list of added examples for possible removal.

    Args:
        part_of_the_prompt (str): The part of the prompt to which the example will be added or from which it will be removed.
        cat (str): The category column name in the test set.
        example_type (str): The type of example to find ('positive' or 'negative').
        mode (str): Mode of operation - 'add' to add an example, 'remove' to remove an example.

    Returns:
        str: The updated part of the prompt with example clauses
    """
    
    global added_examples
    
    example_type = "negative"
    mode = "add"
    
    
    if mode == "add":
        if example_type == "positive":
            filtered_set = df_train[df_train[cat] == 1]
        elif example_type == "negative":
            filtered_set = df_train[df_train[cat] == 0]

        if filtered_set.empty:
            return "No negative clause found of the category in data set."
        
        sampled_example = sample_example(filtered_set, cat)
            
        added_examples[example_type].append(sampled_example)

        
        examples_text = ""
        for example_type, examples in added_examples.items():
            for example in examples:
                examples_text += f"For example, consider this clause which is not of this category: \"{example}\""
        
        return examples_text + ". " + part_of_the_prompt

In [None]:
 def remove_positive_example(part_of_the_prompt: str, cat: str) -> str:
    
    """
    Remove positive example from the prompt.
    Maintained a list of added examples for possible removal.

    Args:
        part_of_the_prompt (str): The part of the prompt to which the example will be added or from which it will be removed.
        cat (str): The category column name in the test set.
        example_type (str): The type of example to find ('positive' or 'negative').
        mode (str): Mode of operation - 'add' to add an example, 'remove' to remove an example.

    Returns:
        str: The updated part of the prompt with example clauses
    """
    
    global added_examples
    
    example_type = "positive"
    mode = "remove"
    
    
    if mode == "remove":
        if not added_examples[example_type]:
            print(f"No {example_type} example to remove. \n" )
            examples_text = ""
            for example_type, examples in added_examples.items():
                for example in examples:
                    examples_text += f"For example, consider this clause of the same category: \"{example}\""

            return examples_text + part_of_the_prompt  
        
        removed_example = added_examples[example_type].pop(0)
        
        examples_text = ""
        for example_type, examples in added_examples.items():
            for example in examples:
                examples_text += f"For example, consider this clause of the same category: \"{example}\""
        
        return examples_text + part_of_the_prompt 

In [None]:
def remove_negative_example(part_of_the_prompt: str, cat: str) -> str:
    
    """
    Remove negative example from the prompt.
    Maintained a list of added examples for possible removal.

    Args:
        part_of_the_prompt (str): The part of the prompt to which the example will be added or from which it will be removed.
        cat (str): The category column name in the test set.
        example_type (str): The type of example to find ('positive' or 'negative').
        mode (str): Mode of operation - 'add' to add an example, 'remove' to remove an example.

    Returns:
        str: The updated part of the prompt with example clauses
    """
    
    global added_examples
    
    example_type = "negative"
    mode = "remove"
    
    
    if mode == "remove":
        if not added_examples[example_type]:
            print(f"No {example_type} example to remove. \n" )
            examples_text = ""
            for example_type, examples in added_examples.items():
                for example in examples:
                    examples_text += f" For example, consider this clause which is not of this category: \"{example}\""

            return examples_text + part_of_the_prompt 
        
        removed_example = added_examples[example_type].pop(0)
        
        examples_text = ""
        for example_type, examples in added_examples.items():
            for example in examples:
                examples_text += f" For example, consider this clause which is not of this category: \"{example}\""
        
        return examples_text + part_of_the_prompt 

In [None]:
from langchain.prompts import PromptTemplate
from langchain.tools import BaseTool, StructuredTool, tool
from langchain_huggingface.llms import HuggingFacePipeline
from transformers import pipeline, PegasusForConditionalGeneration, PegasusTokenizer
import torch

def paraphrase_huggingface(inital_prompt: str, pipe: pipeline) -> str:
    """
    Description = "Generates a variation of the input text while keeping the semantic meaning using HuggingFace pipeline."
    """
    name = "paraphrase_huggingface"

    hf = HuggingFacePipeline(pipeline=pipe)

    # Prepare the prompt template for the paraphrasing task
    template = """Generate a variation of the input text while keeping the semantic meaning: \n Input:{text} \n Output: """

    # Create the chain and run it
    prompt = PromptTemplate.from_template(template)
    chain = prompt | hf.bind(skip_prompt=True)
    print(f"Starting to paraphrase the following prompt: {inital_prompt}\n Using {pipeline} as pipeline:")
    result = chain.invoke({"text": inital_prompt})
    
    return result

class ParaphrasePegasus():
    def __init__(self, model_name: str = 'tuner007/pegasus_paraphrase') -> None:    
        self.torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.para_tokenizer = PegasusTokenizer.from_pretrained(model_name)
        self.para_model = PegasusForConditionalGeneration.from_pretrained(model_name).to(self.torch_device)

    def paraphrase_pegasus(self, inital_prompt: str, num_beams: int = 10, num_return_sequences: int = 1) -> str:
        """
        Description: "Generates a variation of the input text while keeping the semantic meaning using Pegasus model."
        """
        # Paraphrase pipeline
        print(f"Starting to paraphrase the following prompt: {inital_prompt} \n Using Pegasus:")
        batch = self.para_tokenizer([inital_prompt], truncation=True, padding='longest', max_length=60, return_tensors="pt").to(self.torch_device)
        translated = self.para_model.generate(**batch, max_length=60, num_beams=num_beams, num_return_sequences=num_return_sequences, temperature=1.5, do_sample=True)
        tgt_text = self.para_tokenizer.batch_decode(translated, skip_special_tokens=True)
        
        return tgt_text[0]  


In [None]:
from transformers import AutoTokenizer, T5ForConditionalGeneration
import torch

class GrammarAdjustment():
    def __init__(self) -> None:
        self.torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.tokenizer = AutoTokenizer.from_pretrained("grammarly/coedit-large")
        self.model = T5ForConditionalGeneration.from_pretrained("grammarly/coedit-large")

    def grammatic_adjustment(self, inital_prompt: str):
        gram_check_promtp = f"Fix grammatical errors in this sentence: {inital_prompt}"
        input_ids = self.tokenizer(gram_check_promtp, return_tensors="pt").input_ids
        outputs = self.model.generate(input_ids, max_length=256)
        edited_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return edited_text



In [None]:
def run_prompt_optimization(
    part_of_prompt_to_modify: str, model, num_iterations: int = 3, category=category
) -> List[Dict]:
    """Optimizes the prompt using an LLM to choose the best edit operation."""
    ## For this.. online: <current clause, > <Legal Description, > <Examples, >
    ## [{'iteration': 1, "prompt": "<intro> <current_clause><legal description> <examples> <part_of_prompt_to_modify>", "score": 80}]
    self.optimization_history = []  # Initialize as an empty list attribute
    current_part_of_prompt_to_modify = part_of_prompt_to_modify

    for iteration in range(num_iterations):  # Iterate for the specified number of times
        # 1. Construct LLM Prompt
        if iteration == 0:
            # Special prompt for the first iteration (no history)
            llm_prompt = (
                f"You are tasked with optimizing a prompt. This is iteration {iteration + 1}.\n" 
                "Full Prompt: <intro><current_clause><legal_description><examples><part_of_prompt_to_modify>"
                "<intro> is: Consider the following online terms of service clause:"
                "current_clause> is the variable clause to be classified."
                "<legal_description> is: question helping to classify clauses"
                "<examples> is: optional, example clauses that are either positive or negative to the classification"
                f"<part_of_prompt_to_modify> is {current_part_of_prompt_to_modify}, only modify this part\n\n"
                
                "Choose the most suitable action to improve the prompt:\n"
                "- shorten: Remove unnecessary details.\n"
                "- add_positive_example: Include a positive example.\n"
                "- add_negative_example: Include a negative example.\n"
                "- paraphrase: Rephrase the prompt to be more clear and concise.\n"
                "- reformat: Change the structure or formatting of the prompt.\n"
                "- grammar_adjustment: Fix any grammatical errors in the prompt.\n"
            )
        else:
            # Prompt for subsequent iterations (with history)
            llm_prompt = (
                f"You are tasked with optimizing a prompt. This is iteration {iteration + 1}.\n"  # Add 1 to display 1-based iteration
                "Full Prompt: <intro><current_clause><legal_description><examples><part_of_prompt_to_modify>"
                "<intro> is: Consider the following online terms of service clause:"
                "current_clause> is the variable clause to be classified."
                "<legal_description> is: question helping to classify clauses"
                "<examples> is: optional, example clauses that are either positive or negative to the classification"
                f"<part_of_prompt_to_modify> is {current_part_of_prompt_to_modify}, only modify this part\n\n"
                f"Optimization History: {self.optimization_history}\n\n"
                "Choose the most suitable action to further improve the prompt based on the score trends:\n"
                "- shorten: Remove unnecessary details.\n"
                "- add_positive_example: Include a positive example.\n"
                "- add_negative_example: Include a negative example.\n"
                "- remove_positive_example: Remove the existing positive example.\n"
                "- remove_negative_example: Remove the existing negative example.\n"                
                "- paraphrase: Rephrase the prompt to be more clear and concise.\n"
                "- reformat: Change the structure or formatting of the prompt.\n"
                "- grammar_adjustment: Fix any grammatical errors in the prompt.\n"
            )

        # 2. Get LLM's Decision
        response = model(llm_prompt)  
        chosen_action = response.lower().strip()

        
        # 3. Apply Chosen Action
        if chosen_action == "shorten":
            part_of_prompt_to_modify = shorten_prompt(part_of_prompt_to_modify)
        elif chosen_action == "add_positive_example":
            part_of_prompt_to_modify = add_positive_example(part_of_prompt_to_modify,category)
        elif chosen_action == "add_negative_example":
            part_of_prompt_to_modify = add_negative_example(part_of_prompt_to_modify,category)
        elif chosen_action == "remove_positive_example":
            part_of_prompt_to_modify = remove_positive_example(part_of_prompt_to_modify,category)
        elif chosen_action == "remove_negative_example":
             part_of_prompt_to_modify = remove_negative_example(part_of_prompt_to_modify,category)            
        elif chosen_action == "paraphrase":
            paraphraser = ParaphrasePegasus() 
            part_of_prompt_to_modify = paraphraser.paraphrase_pegasus(part_of_prompt_to_modify)
        elif chosen_action == "reformat":
            part_of_prompt_to_modify = reformat(part_of_prompt_to_modify)
        elif chosen_action == "grammar_adjustment":
            grammarAdjustment = GrammarAdjustment()
            part_of_prompt_to_modify = grammarAdjustment.grammar_adjustment(current_prompt)
        else:
            raise ValueError(f"Invalid action chosen by LLM: {chosen_action}")

        # 4. Score and Log (using self.optimization_history)
        score = score_prompt(part_of_prompt_to_modify)
        self.optimization_history.append(
            {"iteration": iteration + 1, "prompt": part_of_prompt_to_modify, "edit": chosen_action, "score": score}  # Log 1-based iteration
        )

    return self.optimization_history


In [None]:
# Define your model (using your actual API key/setup)
import json
from typing import Dict, List

from langchain_groq import ChatGroq
from langchain.schema import HumanMessage, SystemMessage

model = ChatGroq(
    model_name="llama3-70b-8192",
    groq_api_key="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",
    temperature=0
)

initial_prompt = make_answer_instruction()

# Initial prompt to be optimized
#initial_prompt = "Consider the following online terms of service clause: 'websites & communications terms of use'. Does this clause describe an arbitration dispute resolution process that is not fully optional to the consumer? Begin your answer with 'yes' or 'no' and then justify your response in no more than 50 words. For example, consider these example clauses: <positive, if you are not a consumer in the eea , the exclusive place of jurisdiction for all disputes arising from or in connection with this agreement is san francisco county , california , or the united states district court for the northern district of california and our dispute will bedetermined under california law .'>, <negative, you are prohibited from using any services or facilities provided in connection with this service to compromise security or tamper with system resources and/or accounts .>"

#cat_results = {}
#df_train_sample = sample_equal_distribution(df_train, unfairness_categories, 100, 123)
#score_set_df = sample_equal_distribution(df_train, unfairness_categories, 25, 234)
#...

for cat in unfairness_categories:
    
   # Run prompt optimization, specifying we're on the 4th and last iteration
   optimized_history = run_prompt_optimization(
        initial_prompt,
        model,
        num_iterations=4,
        category=cat
    )
   optimized_prompt = optimized_history[-1]["prompt"]
    

print(json.dumps(optimized_history, indent=2))