# Evolutionary Prompt Selection
## For Planner/Worker/Solver Framework

### Install Dependencies

In [None]:
!pip install accelerate
!pip install bitsandbytes
!pip install sentencepiece
!pip install torch
!pip install transformers
!pip install wikipedia


### Define Prompt Prefixes/Suffixes

In [None]:
PLANNER_PROMPT = {"prefix": "For the following tasks, make plans that can solve the problem step-by-step. For each plan, indicate which external tool together with tool input to retrieve evidence. You can store the evidence into a variable #E that can be called by later tools. (Plan, #E1, Plan, #E2, Plan, ...) \n\n","suffix": "Begin! Describe your plans with rich details. Each Plan should be followed by only one #E.\n\n"}
SOLVER_PROMPT = {"prefix": "Solve the following task or problem. To assist you, we provide some plans and corresponding evidences that might be helpful. Notice that some of these information contain noise so you should trust them with caution.\n\n", "suffix": "\nNow begin to solve the task or problem. Respond with the answer directly with no extra words.\n\n"}


### Define classes and functions

#### Nodes

In [None]:
import wikipedia

class Node:
    """Basic node class"""
    def __init__(self):
        raise NotImplementedError

    def run (self, inputs):
        raise NotImplementedError

class LLMNode(Node):
    """A node that is based on an LLM"""
    def __init__(self, model):
        self.model = model

    def call_llm(self, prompt):
        """Calls the underlying LLM with the given inputs
        Parameters:
        ------------
        prompt: str
            prompt for the LLM

        Returns:
        ------------
        response: str
            LLM response
        """
        response = self.model.generate(prompt)
        return response

class Planner(LLMNode):
    """Planner node for making plans within the PWS framework"""
    def __init__(self, model):
        self.prefix = PLANNER_PROMPT['prefix']
        self.suffix = PLANNER_PROMPT['suffix']
        self.model = model

    def run(self, task, examples, tools):
        """Generate plans for the given task, examples and tools
        Parameters:
        ------------
        task: str
            Task for which the plan is to be generated
        examples: list(str)
            Examples related to the task for the fewshot prompt
        tools: dict(str:str)
            Tools that can be used to solve the task

        Returns:
        ------------
        planner_response: dict(str:obj)
            Planner response contains the plans and the evidences
        """
        prompt = self.generate_prompt(task, examples, tools)
        response = self.call_llm(prompt)
        plans, tool_calls = self.parse_response(response)
        planner_response = {'plans': plans, 'tool_calls': tool_calls}
        return planner_response

    def generate_prompt(self, task, examples, tools):
        """Generates a planner prompt for the given task, examples and tools
        Parameters:
        ------------
        task: str
            Task for which the plan is to be generated
        examples: list(str)
            Examples related to the task for the fewshot prompt
        tools: dict(str:str)
            Tools that can be used to solve the task

        Returns:
        ------------
        prompt: str
            planner prompt
        """
        prompt = self.prefix
        prompt += self.generate_worker_prompt(tools)
        prompt += '\n\n'.join(examples)
        prompt += self.suffix
        prompt += f"Question: {task}\n"
        return prompt

    def generate_worker_prompt(self, tools):
        """Generates a worker prompt for given tools
        Parameters:
        ------------
        tools: dict(str:str)
            contains the names and descriptions of the tools

        Returns:
        ------------
        prompt: str
            worker prompt
        """
        prompt = "Tools can be one of the following:\n"
        for tool, description in tools.items():
            prompt += f"{tool}[input]: {description}\n"
        return prompt + "\n"

    def parse_response(self, response):
        """Parse the planner response and return plans and evidences dictionary
        Parameters:
        ------------
        response: str
            Planner response

        Returns:
        ------------
        plans: list(str)
            List that contains the plans
        evidences: dict(str:str)
            Evidence dict conatining evidences and associated tool calls
        """
        plans = []
        tool_calls = {}
        for line in response.splitlines():
            if line.startswith("Plan:"):
                plans.append(line)
            elif line.startswith("#") and line[1] == "E" and line[2].isdigit():
                e, tool_call = line.split("=", 1)
                e, tool_call = e.strip(), tool_call.strip()
                if len(e) == 3:
                    tool_calls[e] = tool_call
                else:
                    tool_calls[e] = "No evidence found"
        return plans, tool_calls

class WikipediaWorker(Node):
    """Worker that searches Wikipedia"""
    def __init__(self):
        pass

    def run(self, inputs):
        """Searches Wikipedia for the given inputs and returns the first
        paragraph of the first page in search results
        Parameters:
        ------------
        inputs: str
            String input for Wikipedia search

        Returns:
        ------------
        evidence: str
            First paragraph of the first page from the search results
        """
        pages = wikipedia.search(inputs, results=1)
        if pages:
            content = wikipedia.page(pages[0], auto_suggest=False).content
            evidence = content.split('\n\n', 1)[0]
        else:
            evidence = "No evidence found."
        return evidence

class LLMWorker(LLMNode):
    """LLM node to be used for worker calls"""
    def run(self, inputs):
        """Run the LLM as a tool call
        Parameters:
        ------------
        inputs: str
            Input for the tool call

        Returns:
        ------------
        evidence: str
            Cleaned response from the tool call
        """
        prompt = f"Respond in short directly with no extra words.\n\n{inputs}\n\n"
        response = self.call_llm(prompt)
        evidence = response.strip("\n")
        return evidence

class Worker(Node):
    """Worker node that calls appropriate workers for each tool call"""
    def __init__(self, model):
        self.wiki_worker = WikipediaWorker()
        self.llm_worker = LLMWorker(model)

    def run(self, inputs):
        """Faciliates all tool calls and returns evidences
        Parameters:
        ------------
        inputs: dict(str:str)
            A dictionary of evidence variables and associated tool calls

        Returns:
        ------------
        evidences: dict(str:str)
            A dictinary of evidence variables and the outputs of the associated
            tool calls
        """
        evidences = {}
        for e, tool_call in inputs.items():
            # Do not process tools without input
            if "[" not in tool_call:
                evidences[e] = tool_call
                continue

            # Seperate tool and tool input
            tool, tool_input = tool_call.split("[", 1)
            tool_input = tool_input[:-1]

            # Find variables in input and replace with previous evidences
            for var in re.findall(r"#E\d+", tool_input):
                if var in evidences:
                    try:
                        evidence = evidences[var]
                    except KeyError:
                        evidence = "No evidence found."
                    tool_input = tool_input.replace(var, f"[{evidence}]")

            match tool:
                case "Wikipedia":
                    evidences[e] = self.wiki_worker.run(tool_input)
                case "LLM":
                    evidences[e] = self.llm_worker.run(tool_input)
                case _:
                    evidences[e] = "No evidence found."

        return evidences

class Solver(LLMNode):
    """Solver node that solves tasks for given plans and evidences"""
    def __init__(self, model):
        self.prefix = SOLVER_PROMPT['prefix']
        self.suffix = SOLVER_PROMPT['suffix']
        self.model = model

    def run(self, task, plans, evidences):
        """Solve the task based on the given plans and evidences
        Parameters:
        ------------
        task: str
            Task to be solved
        plans: list(str)
            List of plans generated by Planner
        evidences: dict(str:str)
            Dictionary of evidences generated by the Worker

        Returns:
        ------------
        output: str
            Solution generated based on the given plans and evidences
        """
        prompt = self.prefix
        prompt += task + '\n'
        for i in range(len(plans)):
            e = f"#E{i + 1}"
            plan = plans[i]
            try:
              evidence = evidences[e]
            except KeyError:
              evidence = "No evidence found."
            prompt += f"{plan}\nEvidence:\n{evidence}\n"
        prompt += self.suffix
        prompt += task + '\n\n'
        output = self.call_llm(prompt)
        return output


#### Utils

In [None]:
import re
import time
import torch
from transformers import GenerationConfig, LlamaForCausalLM, LlamaTokenizer
from transformers import StoppingCriteria, StoppingCriteriaList

class ParagraphStoppingCriteria(StoppingCriteria):
    """Stops generating after double newline."""
    def __init__(self, newline_id):
        self.newline = newline_id

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        if input_ids[0][-1] == self.newline \
        and input_ids[0][-2] == self.newline:

            return True
        return False
        
class LanguageModel:
    """Language model wrapper to be used in nodes"""
    def __init__(self, model_path, generation_config):
        self.tokenizer = LlamaTokenizer.from_pretrained(model_path)
        self.model = LlamaForCausalLM.from_pretrained(
            model_path, torch_dtype=torch.float16, device_map='auto')
        self.generation_config = generation_config
        newline_id = self.tokenizer.encode('\n')[-1]
        self.stopping_criteria = StoppingCriteriaList([ParagraphStoppingCriteria(newline_id)])
    
    def generate(self, prompt):
        """Generate text based on given prompt
        Parameters:
        ------------
        prompt: str
            Prompt for thee LLM

        Returns:
        ------------
        llm_response: str
            LLM generated response
        """
        tokens = self.tokenizer.encode(prompt)
        tokens = torch.LongTensor(tokens).unsqueeze(0)
        tokens = tokens.to('cuda')

        length = len(tokens[0])
        with torch.no_grad():
            rest = self.model.generate(
                input_ids=tokens,
                generation_config=self.generation_config,
                stopping_criteria=self.stopping_criteria
            )
        output = rest[0][length:]
        llm_response = self.tokenizer.decode(output, skip_special_tokens=True)
        return llm_response

class PWS:
    """ Planner Worker Solver Framework"""
    def __init__(self, model):
        self.planner = Planner(model=model)
        self.worker = Worker(model=model)
        self.solver = Solver(model=model)

    def run(self, task, examples, tools, verbose=False):
        """Run the PWS on a given task based on provided examples/tools
        Parameters:
        ------------
        task: str
            Task for which the PWS is to be run
        examples: list(str)
            Examples related to the task for the fewshot prompt
        tools: dict(str:str)
            Tools that can be used to solve the task

        Returns:
        ------------
        pws_response: dict(str:obj)
            PWS response contains the output and time elapsed
            If verbose responses from intermediate nodes are also returned
        """

        st = time.time()
        # Plan
        planner_response = self.planner.run(task, examples, tools)
        plans = planner_response["plans"]
        tool_calls = planner_response["tool_calls"]

        # Work
        evidences = self.worker.run(tool_calls)

        # Solve
        output = self.solver.run(task, plans, evidences)

        wall_time = time.time() - st

        pws_response = {"output": output,
                        "wall_time": wall_time}

        if verbose:
            pws_response["planner_response"] = planner_response
            pws_response["worker_response"] = evidences

        return pws_response


class EPS:
    """ Evolutionary Prompt Selection"""
    #TODO
    def __init__(self):
        raise NotImplementedError
    def select_examples(self, task, num_examples):
        raise NotImplementedError
        # return examples, tools


#### Initialize the model

In [None]:
from transformers import GenerationConfig

MODEL_PATH = "lmsys/vicuna-7b-v1.3"
TEMPERATURE = 0.7
TOP_K = 100
TOP_P = 0.7
REPETITION_PENALTY= 1.0
MAX_NEW_TOKENS = 512

generation_config = GenerationConfig(
    do_sample=True,
    temperature=TEMPERATURE,
    top_k=TOP_K,
    top_p=TOP_P,
    repetition_penalty=REPETITION_PENALTY,
    max_new_tokens=MAX_NEW_TOKENS
)

model = LanguageModel(MODEL_PATH, generation_config=generation_config)


#### Initialize Evolutionary Prompt Selector

In [None]:
# TODO
# EPS()

#### Example/tool dummies
Remove after EPS is implemented

In [None]:
tools = {"Wikipedia": "Worker that search for similar page contents from Wikipedia. Useful when you need to get holistic knowledge about people, places, companies, historical events, or other subjects. The response are long and might contain some irrelevant information. Input should be a search query.",
"LLM": "A pretrained LLM like yourself. Useful when you need to act with general world knowledge and common sense. Prioritize it when you are confident in solving the problem yourself. Input can be any instruction."}

examples = ["""Question: What is the elevation range for the area that the eastern sector of the Colorado orogeny extends into?
Plan: Search for more information about Colorado orogeny.
#E1 = Wikipedia[Colorado orogeny]
Plan: Find out the area that eastern sector of the Colorado orogeny extends into.
#E2 = LLM[What is the name of the area that eastern sector of Colorado extends into? Given context: #E1]
Plan: Search for more information about the area.
#E3 = Wikipedia[#E2]
Plan: Find out the elevation range for the area.
#E4 = LLM[What is elevation range for the area #E2? Given context: #E3]""",
"""Question: Musician and satirist Allie Goertz wrote a song about the "The Simpsons" character Milhouse, who Matt Groening named after who?
Plan: Search for more information about Milhouse.
#E1 = Wikipedia[Milhouse]
Plan: Find out who Matt Groening named Milhouse after.
#E2 = LLM[Who did Matt Groening name Milhouse after? Given context: #E1]""",
"""Question: Which documentary is about Finnish rock groups, Adam Clayton Powell or The Saimaa Gesture?
Plan: Search for more information about Adam Clayton Powell.
#E1 = Wikipedia[Adam Clayton Powell]
Plan: Search for more information about The Saimaa Gesture.
#E2 = Wikipedia[The Saimaa Gesture]
Plan: Compare the two and determine which is a documentary about Finnish rock groups.
#E3 = LLM[Which documentary is about Finnish rock groups, Adam Clayton Powell or The Saimaa Gesture? Given context: #E1, #E2]""",
"""Question: What profession does Nicholas Ray and Elia Kazan have in common?
Plan: Search for more information about Nicholas Ray.
#E1 = Wikipedia[Nicholas Ray]
Plan: Search for more information about Elia Kazan.
#E2 = Wikipedia[Elia Kazan]
Plan: Compare the two and determine what profession they have in common.
#E3 = LLM[What profession does Nicholas Ray and Elia Kazan have in common? Given context: #E1, #E2]""",
"""Question: Which magazine was started first Arthur's Magazine or First for Women?
Plan: Search for more information about Arthur's Magazine.
#E1 = Wikipedia[Arthur's Magazine]
Plan: Search for more information about First for Women.
#E2 = Wikipedia[First for Women]
Plan: Compare the two start dates and determine which magazine was started first.
#E3 = LLM[Which magazine was started first Arthur's Magazine or First for Women? Given context: #E1, #E2]""",
"""Question: Were Pavel Urysohn and Leonid Levin known for the same type of work?
Plan: Search for more information about Pavel Urysohn.
#E1 = Wikipedia[Pavel Urysohn]
Plan: Search for more information about Leonid Levin.
#E2 = Wikipedia[Leonid Levin]
Plan: Compare the two and determine if they were known for the same type of work.
#E3 = LLM[Were Pavel Urysohn and Leonid Levin known for the same type of work? Given context: #E1, #E2]"""]


#### Answer questions

In [None]:
agent = PWS(model)


In [None]:
#task = input("Enter your question...")
task = "Which city is situated near Thames?"
#examples, tools = EPS.select_examples(task, num_examples=3)
response = agent.run(task, examples, tools, verbose=True)
print('response'['output'])


In [None]:
response


# Experiment below this block