In [1]:
# replace the standard sqlite3 module with pysqlite3
# for compatibility with Chroma
__import__('pysqlite3')
import sys
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')

import json
import langchain
import os
import bs4
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_google_vertexai import VertexAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.tools.retriever import create_retriever_tool
from langchain_community.vectorstores import FAISS
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import chat_agent_executor
from langchain_core.messages import HumanMessage
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import PipelinePromptTemplate, PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain.pydantic_v1 import BaseModel, Field
from langchain_core.tools import StructuredTool
from typing import Optional, Type
from langchain.tools import BaseTool
from langchain_core.messages import SystemMessage
from langgraph.checkpoint import MemorySaver  # an in-memory checkpointer
from langgraph.prebuilt import create_react_agent

# load environment variables
load_dotenv()

True

In [2]:
os.environ["LANGCHAIN_TRACING_V2"] = "true"

# credential json not required if you are working within vertex AI workbench
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/workspaces/LLM-agent-with-Gemini/fleet-anagram-244304-7dafcc771b2f.json"

LANGCHAIN_API_KEY = os.getenv("LANGCHAIN_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY") # only if you are using text embedding model from google

In [3]:
llm = ChatVertexAI(model="gemini-1.5-pro") # alternative can be gemini-1.5-flash which is faster but less accurate, and also cheaper

### Example of Chain / Runnable Sequence

In [4]:

full_template = """{introduction}

{example}

{start}"""


full_prompt = PromptTemplate.from_template(full_template)

introduction_template = """You are a helpful assistant that can help me to complete the following API payload:"""
introduction_prompt = PromptTemplate.from_template(introduction_template)

example_template = """Here are some examples of interactions you might have with me:

Q: B16C, 1126911, FPP
A: name:p.dsid,value, value:B16C, name:p.lot, value:1126911, name:p.pid, value:FPP

Q: Y42M, 11952591, FQQP
A: name:p.dsid, value:Y42M, name:p.lot, value:11952591, name:p.pid, value:FQQP

Q: Y42M, 1252391, FPC
A: name:p.dsid, value:Y42M, name:p.lot, value:1252391, name:p.pid, value:FPC

Wrap the answer in a dictionary with the structure of the example above. The input will be a string with the format "dsid, lot, pid" 
and the output should be a dictionary with the keys "name:p.dsid", "name:p.lot", and "name:p.pid" with the corresponding values from the input string.

"""


example_prompt = PromptTemplate.from_template(example_template)

start_template = """Now, do this for real!

Q: {input}
A:"""
start_prompt = PromptTemplate.from_template(start_template)

input_prompts = [
    ("introduction", introduction_prompt),
    ("example", example_prompt),
    ("start", start_prompt),
]
pipeline_prompt = PipelinePromptTemplate(
    final_prompt=full_prompt, pipeline_prompts=input_prompts
)


In [5]:
pipeline_prompt.input_variables

['input']

In [6]:
print(
    pipeline_prompt.format(
        input="B16C, 1126911, FPP",
    )
)

You are a helpful assistant that can help me to complete the following API payload:

Here are some examples of interactions you might have with me:

Q: B16C, 1126911, FPP
A: name:p.dsid,value, value:B16C, name:p.lot, value:1126911, name:p.pid, value:FPP

Q: Y42M, 11952591, FQQP
A: name:p.dsid, value:Y42M, name:p.lot, value:11952591, name:p.pid, value:FQQP

Q: Y42M, 1252391, FPC
A: name:p.dsid, value:Y42M, name:p.lot, value:1252391, name:p.pid, value:FPC

Wrap the answer in a dictionary with the structure of the example above. The input will be a string with the format "dsid, lot, pid" 
and the output should be a dictionary with the keys "name:p.dsid", "name:p.lot", and "name:p.pid" with the corresponding values from the input string.



Now, do this for real!

Q: B16C, 1126911, FPP
A:


In [7]:
chain = pipeline_prompt | llm | StrOutputParser()

output = chain.invoke({"input": "B47R,1952591,FPC"})
print(output)


```json
{
"name:p.dsid": "B47R",
"name:p.lot": "1952591",
"name:p.pid": "FPC"
}
``` 



In [10]:
output = chain.invoke({"input": "test"})
print(output)

```python
def format_api_payload(input_string):
  """Formats the input string into an API payload dictionary.

  Args:
    input_string: A string containing dsid, lot, and pid separated by commas.

  Returns:
    A dictionary representing the API payload, or an error message if the input
    is invalid.
  """
  try:
    dsid, lot, pid = input_string.split(',')
    return {
        "name:p.dsid": dsid.strip(),
        "name:p.lot": lot.strip(),
        "name:p.pid": pid.strip()
    }
  except ValueError:
    return "Invalid input format. Please use the format 'dsid, lot, pid'."

# Example usage with your test case:
input_string = "test"
output_payload = format_api_payload(input_string)
print(output_payload)  
```

**Explanation:**

1. **Function Definition:**
   - We define a function called `format_api_payload` that takes the input string as an argument.

2. **Input Validation:**
   - We use `try-except` to handle cases where the input string might not be in the expected "dsid, lot, pi

In [12]:
output = chain.invoke({"input": "What is langchain"})
print(output)

I understand! You want me to take an input like "dsid, lot, pid" and give you a dictionary like this:

```python
{
  "name:p.dsid": "dsid",
  "name:p.lot": "lot",
  "name:p.pid": "pid"
} 
```

Let's forget about Langchain for now, I can definitely help you with that! Ask me your "dsid, lot, pid" string and I'll format it for your API payload. 😊  



### Retrieval Augmented Generation

In [13]:
# using LLM as a knowledge base retriever

web_paths = [
    "https://google.github.io/styleguide/pyguide.html",
    "https://google.github.io/styleguide/Rguide.html",
]

docs = []
for path in web_paths:
    loader = WebBaseLoader(web_paths=(path,))
    docs += loader.load()

In [14]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = FAISS.from_documents(documents=splits, embedding=HuggingFaceEmbeddings()) # needs huggingface api key
retriever = vectorstore.as_retriever()

  from tqdm.autonotebook import tqdm, trange


In [15]:
# Contextualize or internalize the current input question with the chat history
# This is useful when the current question is referencing the chat history

contextualize_q_system_prompt = (
    "Given a chat history and the latest user question "
    "which might reference context in the chat history, "
    "formulate a standalone question which can be understood "
    "without the chat history. Do NOT answer the question, "
    "just reformulate it if needed and otherwise return it as is."
)

contextualize_q_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", contextualize_q_system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)
history_aware_retriever = create_history_aware_retriever(
    llm, retriever, contextualize_q_prompt
)

In [16]:
system_prompt = (
    "You are an assistant for question-answering tasks. "
    "Use the following pieces of retrieved context to answer "
    "the question. If you don't know the answer, say that you "
    "don't know. Use three sentences maximum and keep the "
    "answer concise."
    "\n\n"
    "{context}"
)

In [17]:
qa_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)


question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)

rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

In [18]:
print(rag_chain)

bound=RunnableAssign(mapper={
  context: RunnableBinding(bound=RunnableBranch(branches=[(RunnableLambda(lambda x: not x.get('chat_history', False)), RunnableLambda(lambda x: x['input'])
           | VectorStoreRetriever(tags=['FAISS', 'HuggingFaceEmbeddings'], vectorstore=<langchain_community.vectorstores.faiss.FAISS object at 0x7946035fd990>))], default=ChatPromptTemplate(input_variables=['chat_history', 'input'], input_types={'chat_history': typing.List[typing.Union[langchain_core.messages.ai.AIMessage, langchain_core.messages.human.HumanMessage, langchain_core.messages.chat.ChatMessage, langchain_core.messages.system.SystemMessage, langchain_core.messages.function.FunctionMessage, langchain_core.messages.tool.ToolMessage]]}, messages=[SystemMessagePromptTemplate(prompt=PromptTemplate(input_variables=[], template='Given a chat history and the latest user question which might reference context in the chat history, formulate a standalone question which can be understood without the cha

In [19]:
store = {}


def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]


conversational_rag_chain = RunnableWithMessageHistory(
    rag_chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="chat_history",
    output_messages_key="answer",
)

In [20]:
conversational_rag_chain.invoke(
    {"input": "What are the best practices for Python code style?"},
    config={
        "configurable": {"session_id": "testing"}
    },  # constructs a key "testing" in `store` to store the chat history of the session
)["answer"]

'The Google Python Style Guide suggests using a consistent style for things like docstrings, comments, and variable names. Docstrings should use the three double-quotes format, and descriptiveness in naming should be proportional to the scope of visibility. Always strive for consistency with the existing codebase you are working with. \n'

### Agent with custom tools

In [22]:
class GenericInputSchema(BaseModel):
    """Inputs for url navigation tool."""
    domain: str = Field(
        description="The domain of the website you want to navigate to.")

class ParamInputSchema(BaseModel):
    """Inputs API payload."""
    params: str = Field(
        description="The input string with the format 'dsid, lot, type', which can be used to create a dictionary with the keys 'name:p.dsid', 'name:p.lot', and 'name:p.type'.")
    
class payload_formatter(BaseTool):
    name: str = "payload_formatter"
    args_schema: Optional[Type[BaseModel]] = ParamInputSchema
    description: str = """
    
    payload formatter is a tool that takes an input string with the format 'dsid, lot, type' and returns a dictionary with the keys 'name:p.dsid', 'name:p.lot', and 'name:p.type'.
    """
    def format_input(self,input_string):
        parts = input_string.replace('.', '').split(',')
        parts = [part.strip() for part in parts if part.strip() != '']
        if len(parts) != 3:
            raise ValueError("Please check that the input contains exactly three parts: dsid, lot, and pid.")
        for part in parts:
            if part[0].isalpha() and part[-1].isalpha() and part[1:-1].isdigit():
                dsid = part
            elif part.isdigit():
                lot = part
            else:
                type_ = part
        output = {
            "name:p.dsid": dsid,
            "name:p.lot": lot,
            "name:p.type": type_
        }
        return output

    def _run(self, params: str):
        return self.format_input(params)
    
class url_navigator(BaseTool):
    name: str = "map_viewer_url"
    args_schema: Optional[Type[BaseModel]] = GenericInputSchema
    description: str = """
    
    url navigator is a tool that takes a domain as input and returns the url of the website you want to navigate to.
    """

    def _run(self, domain: str):
        return f"www.{domain}.com"

In [23]:
tools = [url_navigator(), payload_formatter()] # create a list of tools which will be the input of create_react_agent

In [24]:
system_message = "You are a helpful assistant and you know how to use url navigator tool and payload formatter tool. Pick the most appropriate tools to solve different tasks"
# not compulsory but I want to remind it the tools that it has access to

memory = MemorySaver()
app = create_react_agent(
    llm, tools, messages_modifier=system_message, checkpointer=memory
)


In [25]:
config = {"configurable": {"thread_id": "chat-1"}}

print(
    app.invoke(
        {
            "messages": [
                ("user", "1952591,,,,,B47R,FPC")
            ]
        },
        config,
    )["messages"][-1].content
)

```json
{"name:p.dsid": "B47R", "name:p.lot": "1952591", "name:p.type": "FPC"}
```


In [26]:
print(
    app.invoke(
        {
            "messages": [
                ("Navigate to langchain")
            ]
        },
        config,
    )["messages"][-1].content
)

www.langchain.com


### Adding in other built-in tools (e.g., search)

In [None]:
search = TavilySearchResults(max_results=2)

tools = [url_navigator(), payload_formatter(), search]

system_message = "You are a helpful assistant and you know how to use url navigator tool, payload formatter tool and tavily search."
# not compulsory but I want to remind it the tools that it has access to

memory = MemorySaver()
app = create_react_agent(
    llm, tools, messages_modifier=system_message, checkpointer=memory
)

config = {"configurable": {"thread_id": "test-thread"}}



In [None]:
print(
    app.invoke(
        {
            "messages": [
                ("Search for the latest Conor McGregor news")
            ]
        },
        config,
    )["messages"][-1].content
)

In [None]:
from langchain.tools.retriever import create_retriever_tool

knowledge_retriever = create_retriever_tool(
    retriever,
    "Google style guide retriever",
    "Searches and returns key pointers for programming languages from google style guide",
)
tools = [url_navigator(), payload_formatter(), search, knowledge_retriever]

In [None]:
system_message = "You are a helpful assistant and you know how to use url navigator tool, payload formatter tool, tavily search and google style guide retriever."


app = create_react_agent(
    llm, tools, messages_modifier=system_message, checkpointer=memory
)

config = {"configurable": {"thread_id": "chat-thread"}}

In [None]:
app.invoke({"messages":"Python rules"}, config)