# Building Self-RAG using LangGraph

## Install and load dependencies

In [None]:
!pip install langchain_core~=0.3.6 langgraph~=0.2.28 langchain langchain_aws~=0.2.1 langchain_community~=0.3.1 langchain_pinecone~=0.2.0 boto3



In [None]:
from typing import Any, Dict
import os

from pydantic import BaseModel
from langchain_core.runnables import RunnableLambda
from langgraph.graph import StateGraph, START
from langgraph.graph.graph import END
from typing_extensions import TypedDict

from langchain.prompts import PromptTemplate
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain_community.tools.tavily_search import TavilySearchResults

import boto3
from langchain_community.embeddings import BedrockEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_aws import ChatBedrock
from langchain.chains.query_constructor.base import AttributeInfo, StructuredQueryOutputParser,get_query_constructor_prompt
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.retrievers.self_query.pinecone import PineconeTranslator

### Environment Variables
Make sure to add your own keys here.

In [None]:
# Load from google secrets
from google.colab import userdata
TAVILY_API_KEY = userdata.get('TAVILY_API_KEY')
PINECONE_API_KEY = userdata.get('PINECONE_API_KEY')
AWS_ACCESS_KEY_ID = userdata.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = userdata.get('AWS_SECRET_ACCESS_KEY')

# Load as environment variable
os.environ["TAVILY_API_KEY"] = TAVILY_API_KEY
os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY
os.environ["AWS_ACCESS_KEY_ID"] = AWS_ACCESS_KEY_ID
os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET_ACCESS_KEY
os.environ["AWS_REGION"] = "us-east-1"

## Vectorstore
This section assumes you already have an up-and-running vectorstore.
If you want more details on how to create a vectorstore from scratch, checkout my previous Medium article (https://medium.com/@lorevanoudenhove/production-ready-chatbots-with-langchain-langserve-pinecone-and-aws-e65a00e832e3) or YouTube video (https://youtu.be/KqEIBGU0rf0).

The vectorstore that we will use in this example contains some scientific papers.

In [None]:
# Initialize embedding model
bedrock_client = boto3.client("bedrock-runtime", region_name='us-east-1')
bedrock_embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1", client=bedrock_client)

# Initialize vectorstore and retriever
vectorstore = PineconeVectorStore(index_name='tutorial-202408', embedding=bedrock_embeddings)

# Large Language Model (LLM)
Our AI agent will use one main LLM model for it's interactions: Anthropics's Claude 3.5 sonnet.
We will access this model using AWS Bedrock.

In [None]:
# Initialize laude 3.5 Sonnet LLM
model = ChatBedrock(model_id='anthropic.claude-3-5-sonnet-20240620-v1:0', client=bedrock_client, model_kwargs={'temperature': 0.6}, region_name='us-east-1')

# Retriever
Function to retrieve text chunks from the vectorstore

In [None]:
retriever = vectorstore.as_retriever()

def retrieve(state: Dict[str, Any]) -> Dict[str, Any]:
    print("---RETRIEVE---")
    question = state["question"]
    documents = retriever.invoke(question)
    return {"documents": [doc.page_content for doc in documents], "question": question}



# Websearch
Use TavilySearch to obtain information from the web

In [None]:
def web_search(state: Dict[str, Any]) -> Dict[str, Any]:
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]
    docs = TavilySearchResults(k=3).invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    documents.append(web_results)
    return {"documents": documents, "question": question, "web_search": "Yes"}

# Generate
Function to generate a reply to the user based on text chunks from the vectorstore or the websearch.

In [None]:
# Define the prompt to generate a reply. The inputs are question (user input) and context (extracted from vectorstore)
answer_prompt = PromptTemplate(
    template="""You are a scientific research assistant and engage in a professional and informative conversation with a person.

            **RULES**
            - Use a neutral, objective, and respectful tone.
            - Never fabricate information. Use only the information provided in the context.
            - Answer from the perspective of a neutral and independent researcher.
            - Avoid phrases like "according to the website" or "based on the documentation" or "in the given context."
            - Do not end an answer with "for further information contact..."
            - Be precise, but feel free to be creative with your explanations while remaining professional.
            - Use markdown formatting for your output, but do not use headings or titles.
            - Do not ask follow-up questions.
    Question: {question}
    Context: {context}
    Answer:""",
    input_variables=["question", "context"],
)

# Combine the prompt, question, context, and model into a RAG chain
rag_chain = answer_prompt | model | StrOutputParser()

# Function to invoke the RAG chain
def generate(state: Dict[str, Any]) -> Dict[str, Any]:
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    answer = rag_chain.invoke(
        {"context": "\n\n".join(documents), "question": question}
    )
    return {"documents": documents, "question": question, "answer": answer}

# Grade Documents (i.e. chunks)
Function to grade each text chunk obtained from the vectorstore for relevance to the question.

In [None]:
# Prompt to command LLM to score the documents.
retrieval_grader_prompt = PromptTemplate(
    template="""You are a grader assessing relevance of a retrieved document to a user question. If the document contains keywords related to the user question, grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals.
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.
    Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
    Here is the retrieved document:
    {document}
    Here is the user question:
    {question}""",
    input_variables=["question", "document"],
)

# Combine the prompt, question, retrieved documents, and model into a LLM chain
retrieval_grader = retrieval_grader_prompt | model | JsonOutputParser()

def grade_documents(state: Dict[str, Any]) -> Dict[str, Any]:
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    filtered_docs = []

    for doc in documents:
        score = retrieval_grader.invoke({"question": question, "document": doc})
        if score["score"].lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(doc)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")

    return {"documents": filtered_docs, "question": question}


# Hallucination Grader
Function to great the relevance an the anser to the question

In [None]:
hallucination_grader_prompt = PromptTemplate(
    template="""You are a grader assessing whether an answer is grounded in / supported by a set of facts. Give a binary score 'yes' or 'no' score to indicate whether the answer to the question is grounded in / supported by a set of facts, also make sure that the answer does actually answer the question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
    Here is the question:
    {question}
    Here are the facts:
    {documents}
    Here is the answer:
    {answer}""",
    input_variables=["answer", "documents", "question"],
)

hallucination_grader = hallucination_grader_prompt | model | JsonOutputParser()

def grade_answer_v_documents_and_question(state: Dict[str, Any]) -> str:
    print("---CHECK HALLUCINATIONS---")
    # If a websearch was already performed, we do not recheck for hallucinations. This in order to avoid an infinite loop.
    if state['web_search'] == "Yes":
        return 'useful'
    # If no websearch was performed yet, this means only retrieved docs were used to answer the question and we can grade for hallucinations.
    else:
        score = hallucination_grader.invoke(
            {"documents": state["documents"], "answer": state["answer"], "question": state["question"]}
        )
        print('hallu score',score)
        return "useful" if score["score"] == "yes" else "not supported"

# Graph = AI Agent

## State
Define the state: these are parameters that are stored throughout the AI Agent flow.
The State includes: the question, the answer, web_search flag, retrieved documents from vectorstore or websearch

In [None]:
# Define the State
class GraphState(TypedDict):
    question: str
    answer: str
    web_search: str
    documents: list[str]

## Workflow

This workflow includes:
- nodes = tasks
- edges = links between nodes
- conditional edges = based on the input can decide to direct to a different node

In [None]:
# Define the LangGraph workflow
workflow = StateGraph(GraphState)

# Add Nodes
workflow.add_node("websearch", web_search)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)


# Add Edges
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_edge("grade_documents", "generate")
workflow.add_edge("websearch", "generate")

# Add Conditional Edge: this edge decides if the answer is relevant, if not a websearch will be performed
workflow.add_conditional_edges(
    "generate",
    grade_answer_v_documents_and_question,
    {
        "not supported": "websearch",
        "useful": END,
    },
)
# Compile the workflow
compiled_workflow = workflow.compile()

# Run AI Agent

### Relevant question
Here we test a question of which we know the info is in the vectorstore.

In [None]:
# User question
question = 'what is a cancer vaccine?'

# Define an empty state
state = {
    "question": question,
    "documents": [],
    "answer": "",
    "web_search": "",
}


outputs = []
for output in compiled_workflow.stream(state):
    print(output)
    for key, value in output.items():
        outputs.append({key: value})
print('outputs', outputs)

---RETRIEVE---
{'retrieve': {'question': 'what is a cancer vaccine?', 'documents': ['Tumor-associated antigen. Shared antigen vaccines can be developed using these two approaches.11 One common strategy involves employing TAAs, which are proteins highly expressed by the ﬁrst approved cancer cells. An illustrative instance is autologous dendritic cell vaccine, sipuleucel-T, which prolonged survival of 2–4 months in patients with metastatic resistant prostate cancer. This therapeutic intervention targets prostate acid phosphatase, a TAA that exhibits high expression levels in prostate cancer cells.20,21 Additionally, a messenger RNA (mRNA) vaccine comprising four TAAs has demonstrated the capacity to elicit robust and durable immune responses directed against these antigens, with or without in patients with unresectable melanoma.22 A lipid nanoparticle (LNP)-based cancer vaccine encoding the four most frequent Kirsten rat sarcoma virus (KRAS) mutation antigens (G12D, G12V, G13D, and G12C)

We can see that the information in the documents is relevant, as well as the answer (the hallucination).

In [None]:
# Return last answer
outputs[-1]['generate']['answer']

"A cancer vaccine is an immunotherapy approach designed to stimulate the body's immune system to recognize and attack cancer cells. There are two main types of cancer vaccines discussed in the context:\n\n1. Tumor-associated antigen (TAA) vaccines:\n- These target proteins that are highly expressed by cancer cells.\n- Examples include:\n  - Sipuleucel-T: An autologous dendritic cell vaccine for prostate cancer\n  - mRNA vaccines encoding multiple TAAs for melanoma\n  - Lipid nanoparticle-based vaccines targeting KRAS mutations\n  - Vaccines targeting antigens like WT1, MAGE family proteins, Mucin 1, and HER2/neu\n\n2. Viral antigen vaccines:\n- These target antigens from viruses associated with certain cancers.\n- For example, vaccines targeting Epstein-Barr virus antigens for lymphomas and nasopharyngeal cancer.\n\nCancer vaccines can be designed as:\n- Shared antigen vaccines: Target common antigens across multiple patients\n- Personalized neoantigen vaccines: Tailored to an individu

### Irrelevant question
Here we test a question of which we know the info is NOT in the vectorstore.

In [None]:
# User question
question = 'who is elon musk?'

# Define an empty state
state = {
    "question": question,
    "documents": [],
    "answer": "",
    "web_search": "",
}


outputs = []
for output in compiled_workflow.stream(state):
    print(output)
    for key, value in output.items():
        outputs.append({key: value})
print('outputs', outputs)

---RETRIEVE---
{'retrieve': {'question': 'who is elon musk?', 'documents': ['sequences with high codon adaptation indices. mRNA stability is enhanced by optimizing the secondary structure of mRNAs and calculating and selecting mRNA sequences with high minimum free codon adaptation indices.108–110 mRNA is a negatively charged biomolecule that enter the cell through the negatively charged cell membrane to achieve therapeutic effects. Therefore, to reduce the extracellular degradation of naked mRNA by RNA enzymes, several mRNA delivery systems have been designed to lengthen the mRNA improve translation efﬁciency, and circulation time in vivo, increase antigens uptake by APCs.111 Positively charged cationic liposomes binding to negatively charged mRNA contributes to APC endocytosis. Protamine is a polycationic natural peptide with a positive charge that can bind to mRNA to form complexes and maintain mRNA stability.112 The self-adjuvanted mRNA CV9103 coated with protamine, encoding several

We can see that the information in the documents is irrelevant, as well as the answer (the hallucination score).

In [None]:
# Return last answer
outputs[-1]['generate']['answer']

"Elon Musk is a prominent entrepreneur and business magnate known for his involvement in several high-profile technology companies. He is most notably recognized as the founder of SpaceX, a private space exploration company, and the CEO of Tesla, an electric vehicle and clean energy company.\n\nBorn in South Africa, Musk has made significant contributions to various industries:\n\n1. Space exploration: Founded SpaceX in 2002 with the goal of reducing space transportation costs and enabling the colonization of Mars.\n\n2. Electric vehicles: Joined Tesla Motors in 2004 as an early investor and became CEO in 2008, leading the company's efforts in developing and manufacturing electric cars.\n\n3. Online payments: Co-founded X.com, which later merged with Confinity to become PayPal. Musk made a substantial profit when eBay acquired PayPal in 2002.\n\n4. Sustainable energy: Through Tesla, Musk has been involved in developing solar energy products and battery storage systems.\n\nMusk is known