<a href="https://colab.research.google.com/github/joshuaalpuerto/ML-guide/blob/main/langchain_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -qU langchain --progress-bar off
!pip install -qU langchainhub --progress-bar off
!pip install -qU duckduckgo-search --progress-bar off
!pip install -qU fireworks-ai --progress-bar off
!pip install -qU openai --progress-bar off

In [2]:
# @title load fireworks API key
#connect to google drive
from google.colab import drive
import json
import os


drive.mount('/content/drive')

with open('/content/drive/MyDrive/env/env.json') as jsonfile:
    env = json.load(jsonfile)

os.environ["FIREWORKS_API_KEY"] = env['fireworks.ai']['apiKey']


Mounted at /content/drive


In [3]:
import openai
# This is required to make it work for old version of openai < 1
openai.api_base = "https://api.fireworks.ai/inference/v1"
openai.api_key = env['fireworks.ai']['apiKey']

In [4]:
from langchain.globals import set_llm_cache, set_debug
from langchain.cache import InMemoryCache

set_llm_cache(InMemoryCache())
# Turn this on only if you want to debug other wise it's hard to see the conversations.
set_debug(True)

In [103]:
from pydantic import BaseModel, Field
from langchain.agents import tool
from langchain.tools import BaseTool
from typing_extensions import Annotated
from typing import Literal, Optional, Type

TemperatureUnitSymbol = Literal["celcius", "fahrenheit"]

class GetCurrentWeatherInput(BaseModel):
    location: str = Field(description="The city and state, e.g. San Francisco, CA")
    unit: Optional[TemperatureUnitSymbol] = Field(description="The temperature unit value")

class GetCurrentWeather(BaseTool):
    name = "get_current_weather"
    description = "Get the current weather in a given location"
    args_schema: Type[BaseModel] = GetCurrentWeatherInput
    return_direct = True

    def _run(
        self, **payload
    ) -> str:

      location = payload.get("location")
      unit = payload.get("unit")
      weather_info = {
          "location": location,
          "temperature": "72",
          "unit": unit,
          "forecast": ["sunny", "windy"],
      }

      return f"Current location in {location} is 72 {unit}"

class GetFlightOffersInput(BaseModel):
    originLocationCode: str = Field(
        description="City/airport IATA code from which the traveler will depart, e.g., BOS for Boston (Required)"
    )
    destinationLocationCode: str = Field(
        description="City/airport IATA code to which the traveler is going, e.g., PAR for Paris (Required)"
    )
    departureDate: str = Field(
        description="The date on which the traveler will depart from the origin to go to the destination, in YYYY-MM-DD format (Required)"
    )
    returnDate: str = Field(
        description="The date on which the traveler will depart from the destination to return to the origin, in YYYY-MM-DD format"
    )
    adults: int = Field(
        description="The number of adult travelers (age 12 or older on the date of departure) (Required)"
    )


class GetFlightOffers(BaseTool):
    name = "get_flights_offer"
    description = "Search for flight information"
    args_schema: Type[BaseModel] = GetFlightOffersInput

    def _run(
        self, **payload
    ) -> str:
      adults = payload.get("adults", 1)
      return_date = payload.get("returnDate", None)
      # Set up the parameters for the request
      params = {
          **payload,
          "adults": adults,
          "returnDate": return_date,
          "currencyCode": "EUR",
          "max": 5,
      }

      # Remove None values
      params = {k: v for k, v in params.items() if v is not None}


      return (
          f"The cheapest price from {params.get('originLocationCode')} to {params.get('destinationLocationCode')} "
          f"given the dates {params.get('departureDate')} - {params.get('returnDate')} is 500 {params.get('currencyCode')}."
      )

# Function calling

In [None]:
from langchain.chat_models import ChatOpenAI
from langchain.callbacks import StdOutCallbackHandler
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools.render import format_tool_to_openai_function


functions = [get_current_weather]
tools = [{ "type": "function", "function": format_tool_to_openai_function(f)} for f in functions]

# Initialize a Fireworks chat model
# For function calling we cannot use ChatFireworks integration as it doesn't properly pass functions
llm = ChatOpenAI(model="accounts/fireworks/models/fw-function-call-34b-v0",
                 openai_api_key=env['fireworks.ai']['apiKey'],
                 openai_api_base="https://api.fireworks.ai/inference/v1",
                 # verbose=True,
                 temperature= 0, max_tokens= 1024,
                 model_kwargs={ "tools":tools }
                )


prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are very powerful assistant, but don't know current events",
        ),
        ("user", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

In [None]:
from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser
from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages
from langchain.schema.output_parser import BaseLLMOutputParser
from langchain.agents import AgentExecutor
from langchain.schema.runnable import RunnableLambda
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
    BaseMessageChunk,
    ChatMessage,
    FunctionMessage,
    HumanMessage,
    SystemMessage,
    ToolMessage,
)

def format_agent_scratchpad_from_intermediate_steps(x):
  return format_to_openai_tool_messages(x["intermediate_steps"])

# having error when calling convert_message_to_dict from langchain (https://github.com/langchain-ai/langchain/blob/4c47f39fcb539fdeff6dd6d9b1f483cd9a1af69b/libs/community/langchain_community/adapters/openai.py#L104C5-L104C28)
# because when we use tool we are also submitting that to openai (with 'name' in payload which fireworks is not supported yet).
# By default langchain also send functions/tools as role to openai.
# fireworks doesn't support that so we are going to adjust the prompt
def prepare_prompt_for_llm(x):
  messages = []
  print('prepare_prompt_for_llm')
  for message in x.messages:
    if isinstance(message, ToolMessage):
      # remove the name as we don't want to pass that in fireworks.
      message = ToolMessage(content=message.content, tool_call_id=message.tool_call_id)

    messages.append(message)


  print(messages)
  return messages


agent = {
        "input": lambda x: x["input"],
        "agent_scratchpad": lambda x: format_agent_scratchpad_from_intermediate_steps(x),
    } | prompt | RunnableLambda(prepare_prompt_for_llm) | llm | OpenAIToolsAgentOutputParser()


agent_executor = AgentExecutor(agent=agent, tools=[get_current_weather], verbose=True)


agent_executor.invoke({"input": "what is the weather is sf?"})

[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor] Entering Chain run with input:
[0m{
  "input": "what is the weather is sf?"
}
[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": "what is the weather is sf?",
  "intermediate_steps": []
}
[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence > 3:chain:RunnableParallel<input,agent_scratchpad>] Entering Chain run with input:
[0m{
  "input": "what is the weather is sf?",
  "intermediate_steps": []
}
[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence > 3:chain:RunnableParallel<input,agent_scratchpad> > 4:chain:RunnableLambda] Entering Chain run with input:
[0m{
  "input": "what is the weather is sf?",
  "intermediate_steps": []
}
[36;1m[1;3m[chain/end][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence > 3:chain:RunnableParallel<input,agent_scratchpad> > 4:chain:RunnableLamb

{'input': 'what is the weather is sf?',
 'output': 'The current weather in sf is 72 fahrenheit. '}

# Regular LLM

In [6]:
from langchain.llms import Fireworks
llm = Fireworks(
          fireworks_api_key=env['fireworks.ai']['apiKey'],
          model="accounts/fireworks/models/mixtral-8x7b-instruct",
          model_kwargs={"temperature": 0, "max_tokens": 4096},
      )

  warn_deprecated(


In [7]:
from langchain.prompts import PromptTemplate

REACT_PROMPT = """
Answer the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought:{agent_scratchpad}
"""

prompt = PromptTemplate(
          input_variables=["tools", "tool_name", "input", "agent_scratchpad"],
          template=REACT_PROMPT,
      )

In [101]:
# @title We need our own parser to handle our prompt
import re
import json
from typing import Union

from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.exceptions import OutputParserException

from langchain.agents.agent import AgentOutputParser
from langchain.agents.mrkl.prompt import FORMAT_INSTRUCTIONS

FINAL_ANSWER_ACTION = "Final Answer:"
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE = (
    "Invalid Format: Missing 'Action:' after 'Thought:"
)
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE = (
    "Invalid Format: Missing 'Action Input:' after 'Action:'"
)
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE = (
    "Parsing LLM output produced both a final answer and a parse-able action:"
)


class ReActSingleActionInputJsonOutputParser(AgentOutputParser):
    """Parses ReAct-style LLM calls that have a single tool input.

    Expects output to be in one of two formats.

    If the output signals that an action should be taken,
    should be in the below format. This will result in an AgentAction
    being returned.

    ```
    Thought: agent thought here
    Action: search
    Action Input: { location: 'San francisco' }
    ```

    If the output signals that a final answer should be given,
    should be in the below format. This will result in an AgentFinish
    being returned.

    ```
    Thought: agent thought here
    Final Answer: The temperature is 100 degrees
    ```

    """

    def get_format_instructions(self) -> str:
        return FORMAT_INSTRUCTIONS

    def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
        includes_answer = FINAL_ANSWER_ACTION in text
        regex = (
            r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
        )
        action_match = re.search(regex, text, re.DOTALL)
        if action_match:
            if includes_answer:
                raise OutputParserException(
                    f"{FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE}: {text}"
                )
            action = action_match.group(1).strip()
            action_input = action_match.group(2)
            tool_input = action_input.strip(" ")
            tool_input = tool_input.strip('"')
            # Convert string to dictionary
            tool_input = json.loads(tool_input.replace("'", "\""))

            print("printing tool input", type(tool_input))

            return AgentAction(action, tool_input, text)

        elif includes_answer:
            return AgentFinish(
                {"output": text.split(FINAL_ANSWER_ACTION)[-1].strip()}, text
            )

        if not re.search(r"Action\s*\d*\s*:[\s]*(.*?)", text, re.DOTALL):
            raise OutputParserException(
                f"Could not parse LLM output: `{text}`",
                observation=MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE,
                llm_output=text,
                send_to_llm=True,
            )
        elif not re.search(
            r"[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)", text, re.DOTALL
        ):
            raise OutputParserException(
                f"Could not parse LLM output: `{text}`",
                observation=MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
                llm_output=text,
                send_to_llm=True,
            )
        else:
            raise OutputParserException(f"Could not parse LLM output: `{text}`")

    @property
    def _type(self) -> str:
        return "react-single-action-input-json"

In [104]:
from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools.render import render_text_description_and_args
from langchain.agents.output_parsers import ReActSingleInputOutputParser

tools = [GetCurrentWeather(), GetFlightOffers()]

# We don't use `create_tool_calling_agent` because Mistral doesn't support tool calling (unless we use fire function)
# Instead we use regular `create_react_agent`
react_agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt=prompt,
    output_parser=ReActSingleActionInputJsonOutputParser(),
    tools_renderer=render_text_description_and_args
)

# executes the logical steps we created
agent_executor = AgentExecutor(
  agent=react_agent,
  tools=tools,
  verbose=True,
  handle_parsing_errors=True,
  max_iterations = 5 # useful when agent is stuck in a loop
)

agent_executor.invoke({"input": "What's the cheapest flight to madrid?"})

[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor] Entering Chain run with input:
[0m{
  "input": "What's the cheapest flight to madrid?"
}
[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": ""
}
[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence > 3:chain:RunnableAssign<agent_scratchpad>] Entering Chain run with input:
[0m{
  "input": ""
}
[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence > 3:chain:RunnableAssign<agent_scratchpad> > 4:chain:RunnableParallel<agent_scratchpad>] Entering Chain run with input:
[0m{
  "input": ""
}
[32;1m[1;3m[chain/start][0m [1m[1:chain:AgentExecutor > 2:chain:RunnableSequence > 3:chain:RunnableAssign<agent_scratchpad> > 4:chain:RunnableParallel<agent_scratchpad> > 5:chain:RunnableLambda] Entering Chain run with input:
[0m{
  "input": ""
}
[36;1m[1;3m[chain/end][0m [1m[1:chain:AgentExec

{'input': "What's the cheapest flight to madrid?",
 'output': 'The cheapest flight to Madrid I found is 500EUR.'}