In [1]:
from langchain import HuggingFacePipeline
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser, initialize_agent, AgentType
from langchain.chains import LLMChain
from langchain.prompts import StringPromptTemplate
from langchain.agents import tool
from langchain.schema import AgentAction, AgentFinish, OutputParserException
from langchain.schema.output_parser import OutputParserException
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
from typing import List, Union
import re

In [2]:
# from huggingface_hub import login
# from langchain import HuggingFaceHub
# from dotenv import load_dotenv
# import os
# load_dotenv()
# HUGGINGFACEHUB_API_TOKEN=os.getenv("HUGGINGFACEHUB_API_TOKEN")
# login(HUGGINGFACEHUB_API_TOKEN)

# llm = HuggingFaceHub(
#     repo_id="bigcode/starcoder",
#     # model_id="meta-llama/Llama-2-7b-hf",
#     task="text-generation",
#     model_kwargs={"temperature":0.3, "max_length":2000, "min_length":100},
#     huggingfacehub_api_token=HUGGINGFACEHUB_API_TOKEN
# )

In [3]:
# model_name = "TheBloke/CollectiveCognition-v1.1-Mistral-7B-GPTQ"
# model_name = "PY007/TinyLlama-1.1B-Chat-v0.2"
model_name = "TheBloke/Llama-2-7B-chat-GPTQ"

model = AutoModelForCausalLM.from_pretrained(model_name, 
                                             device_map="auto"                                             )
# model=model.to("cuda")
tokenizer = AutoTokenizer.from_pretrained(model_name,
                                          device_map="auto"
                                          )
pipe = pipeline(task="text-generation", model=model, tokenizer=tokenizer)
llm = HuggingFacePipeline(pipeline=pipe)

In [4]:
# llm = HuggingFacePipeline.from_model_id(
#     model_id="Salesforce/codegen-350M-mono",
#     task="text-generation",
#     model_kwargs={"temperature": .7, "max_length":300},
# )

In [5]:
@tool
def get_word_length(word:str) -> int:
    """Returns length of a word in number of characters"""
    return len(word)

tools = [get_word_length]

In [6]:
# Set up the base template
template = """Answer the following questions as best you can. Once you have the information you need to answer the question, return a Final Answer. Do not ask any follow up questions once you have the required information.

You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]. Should not include the input, just the tool_name.
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Question: {input}
{agent_scratchpad}"""

In [7]:
# Set up a prompt template
class CustomPromptTemplate(StringPromptTemplate):
    # The template to use
    template: str
    # The list of tools available
    tools: List[Tool]

    def format(self, **kwargs) -> str:
        # Get the intermediate steps (AgentAction, Observation tuples)
        # Format them in a particular way
        intermediate_steps = kwargs.pop("intermediate_steps")
        thoughts = ""
        for action, observation in intermediate_steps:
            thoughts += action.log
            thoughts += f"\nObservation: {observation}\nThought: "
        # Set the agent_scratchpad variable to that value
        kwargs["agent_scratchpad"] = thoughts
        # Create a tools variable from the list of tools provided
        kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
        # Create a list of tool names for the tools provided
        kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools])
        return self.template.format(**kwargs)

In [8]:
prompt = CustomPromptTemplate(
    template=template,
    tools=tools,
    # This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically
    # This includes the `intermediate_steps` variable because that is needed
    input_variables=["input", "intermediate_steps"]
)

In [9]:
class CustomOutputParser(AgentOutputParser):

    def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
        # Check if agent should finish
        if "Final Answer:" in llm_output:
            return AgentFinish(
                # Return values is generally always a dictionary with a single `output` key
                # It is not recommended to try anything else at the moment :)
                return_values={"output": llm_output.split("Final Answer:")[-1].strip()},
                log=llm_output,
            )
        # Parse out the action and action input
        regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
        match = re.search(regex, llm_output, re.DOTALL)
        if not match:
            raise OutputParserException(f"Could not parse LLM output: `{llm_output}`")
        action = match.group(1).strip()\
            .split("(")[0] #added this to avoid situations where LLM passes input to action directly, instead of just naming the action (function) to use
        action_input = match.group(2)
        # Return the action and action input
        return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)

In [10]:
output_parser = CustomOutputParser()

In [11]:
llm_chain = LLMChain(llm=llm,prompt=prompt)

In [12]:
tool_names = [tool.name for tool in tools]
agent = LLMSingleActionAgent(
    llm_chain=llm_chain,
    output_parser=output_parser,
    stop=["\nObservation:"],
    allowed_tools=tool_names
)

In [13]:
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

In [14]:
agent_executor.run("How long is the word car")



[1m> Entering new AgentExecutor chain...[0m




[32;1m[1;3m
Thought: I will use get_word_length to find out the length of the word car.
Action: get_word_length(car)
Action Input: car[0m

Observation:[36;1m[1;3m3[0m
[32;1m[1;3m3 is the length of the word car.
...
Thought: I now know the final answer to the original input question.
Final Answer: 3[0m

[1m> Finished chain.[0m


'3'

In [15]:
agent_executor.run("How long is the word doggie?")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Thought: I will use get_word_length to find out the length of the word doggie.
Action: get_word_length(word: doggie)
Action Input: doggie[0m

Observation:[36;1m[1;3m6[0m
[32;1m[1;3m6 is the length of the word doggie.
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer to the original input question.
Final Answer: 6[0m

[1m> Finished chain.[0m


'6'

In [16]:
@tool
def get_user_input(information_name:str) -> int:
    """Can be used to collect a piece of information from a user."""
    info = input(f"Enter your {information_name}")
    return str(info)

In [17]:
tools = [get_user_input]

In [18]:
tools

[StructuredTool(name='get_user_input', description='get_user_input(information_name: str) -> int - Can be used to collect a piece of information from a user.', args_schema=<class 'pydantic.main.get_user_inputSchemaSchema'>, func=<function get_user_input at 0x7f15baf948b0>)]

In [19]:
prompt = CustomPromptTemplate(
    template=template,
    tools=tools,
    # This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically
    # This includes the `intermediate_steps` variable because that is needed
    input_variables=["input", "intermediate_steps"]
)

In [20]:
llm_chain = LLMChain(llm=llm,prompt=prompt)

In [21]:
tool_names = [tool.name for tool in tools]
agent = LLMSingleActionAgent(
    llm_chain=llm_chain,
    output_parser=output_parser,
    stop=["\nObservation:"],
    allowed_tools=tool_names
)

In [22]:
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

In [23]:
agent_executor.run("How old is the user?")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: Think about how old the user is.
Action: get_user_input(information_name: "age")
Action Input: age[0m

Observation:[36;1m[1;3m30[0m
[32;1m[1;3m30 is a young age.
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer.
Final Answer: 30[0m

[1m> Finished chain.[0m


'30'