In [1]:
%%capture --no-stderr
%pip install langchain_community langchainhub chromadb langchain langgraph tavily-python langchain-text-splitters langchain_openai

In [15]:
%pip install python-dotenv
import os
from dotenv import load_dotenv
load_dotenv(verbose=True)
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
LANGCHAIN_API_KEY = os.getenv('LANGCHAIN_API_KEY')
TAVILY_API_KEY = os.getenv('TAVILY_API_KEY')
# print(f"""
# - OPENAI_API_KEY:{OPENAI_API_KEY}
# - LANGCHAIN_API_KEY:{LANGCHAIN_API_KEY}
# - TAVILY_API_KEY: {TAVILY_API_KEY}
# """)

Note: you may need to restart the kernel to use updated packages.


In [16]:
from tavily import TavilyClient
tavily = TavilyClient(api_key=TAVILY_API_KEY)

response = tavily.search(query="Where does Messi play right now?", max_results=3)
context = [{"url": obj["url"], "content": obj["content"]} for obj in response['results']]

# You can easily get search result context based on any max tokens straight into your RAG.
# The response is a string of the context within the max_token limit.

response_context = tavily.get_search_context(query="Where does Messi play right now?", search_depth="advanced", max_tokens=500)

# You can also get a simple answer to a question including relevant sources all with a simple function call:
# You can use it for baseline
response_qna = tavily.qna_search(query="Where does Messi play right now?")

In [17]:
from langchain_openai import ChatOpenAI
import os
os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY

llm = ChatOpenAI(model="gpt-4o-mini", temperature = 0)

In [18]:
### Index

from langchain_community.document_loaders import WebBaseLoader
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# Add to vectorDB
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding = OpenAIEmbeddings(model="text-embedding-3-small")
)
retriever = vectorstore.as_retriever()

In [27]:
### Retrieval Grader
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate

system = """You are a grader assessing relevance
    of a retrieved document to a user question. If the document contains keywords related to the user question,
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
    """

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "question: {question}\n\n document: {document} "),
    ]
)

retrieval_grader = prompt | llm | JsonOutputParser()
question = "What is prompt?"
docs = retriever.invoke(question)
doc_txt = docs[0].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

{'score': 'yes'}


In [38]:
### Generate

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

system = """You are an assistant for question-answering tasks.
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
    Use three sentences maximum and keep the answer concise"""

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "question: {question}\n\n context: {context} "),
    ]
)

# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
question = "What is prompt?"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

A prompt is a method used in prompt engineering, also known as In-Context Prompting, to communicate with language models (LLMs) to guide their behavior towards desired outcomes without altering the model's weights. It involves experimentation and heuristics, as the effectiveness of prompts can vary significantly among different models. The primary goal is to achieve alignment and steerability of the model.


In [40]:
### Answer Grader


prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)


# Prompt
system = """You are a grader assessing whether an
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation."""

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "question: {question}\n\n answer: {generation} "),
    ]
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question, "generation": generation})

{'score': 'yes'}

In [39]:
### Hallucination Grader

system = """You are a grader assessing whether
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a
    single key 'score' and no preamble or explanation."""

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "documents: {documents}\n\n answer: {generation} "),
    ]
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

In [20]:
from typing_extensions import TypedDict
from typing import List

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        answer: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    answer: str
    web_search: str
    hallucination: str
    documents: List[str]

In [23]:
retriever

VectorStoreRetriever(tags=['Chroma', 'OpenAIEmbeddings'], vectorstore=<langchain_community.vectorstores.chroma.Chroma object at 0x167144da0>, search_kwargs={})

In [34]:
from langgraph.graph import END
from langchain_core.documents import Document

def doc_retrieval(state: GraphState):
    question = state["question"]

    return {
        "documents": retriever.get_relevant_documents(question)
    }

def relevance_checker(state: GraphState):
    # print(f"@relevance_checker:{state}")

    question = state["question"]
    documents = state["documents"]

    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score["score"]
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue

    return {
        "question": question,
        "documents": filtered_docs,
        "web_search": web_search
    }

def generate_answer(state: GraphState):

    question = state["question"]

    answer_grader.invoke({"question": question, "generation": generation})
    return {
        "answer": "Paris"
    }

def hallucination_checker(state: GraphState):
    # print(f"@hallucination_checker:{state}")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )

    grade = score["score"]

    return {
        "hallucination": "no",
        "answer": state["answer"]
    }

def search_tavily(state: GraphState):
    """
    Searches the web for relevant documents.

    Args:
        state: the state of the graph

    Returns:
        the updated state
    """
    # print(f"@search_tavily:{state}")
    question = state["question"]
    documents = None

    docs = tavily.search(query=question)['results']
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]

    return {
        "question": question,
        "documents": documents
    }

def generate(state: GraphState):
    """
    Generates a response to the question.

    Args:
        state: the state of the graph

    Returns:
        the updated state
    """
    # print(f"@generate:{state}")

    return {
        "generation": "response"
    }

def grade_generation_v_documents_and_question(state):

    print(f"@grade_generation_v_documents_and_question:{state}")

    return "useful"

In [36]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

workflow.set_entry_point("docRetrieval")

workflow.add_node("docRetrieval", doc_retrieval)
workflow.add_node("relevanceChecker", relevance_checker)
workflow.add_node("searchTavily", search_tavily)
workflow.add_node("generateAnswer", generate_answer)
workflow.add_node("hallucinationChecker", hallucination_checker)

workflow.add_edge("docRetrieval", "relevanceChecker")
workflow.add_edge("searchTavily", "relevanceChecker")

workflow.add_conditional_edges(
    "relevanceChecker",
    lambda state: state["web_search"],
    {
        "Yes": "searchTavily",
        "No": "generateAnswer"
    }
)
workflow.add_edge("generateAnswer", "hallucinationChecker")

workflow.add_conditional_edges(
    "hallucinationChecker",
    lambda state: state["hallucination"],
    {
        "yes": "generateAnswer",
        "no": END
    }
)

app = workflow.compile()
inputs = {
    "question": "What is the capital of France?"
}
for output in app.stream(inputs):
    for key, value in output.items():
        print(f"STEP {key}")

print(value["answer"])

STEP docRetrieval
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
STEP relevanceChecker
STEP searchTavily
---GRADE: DOCUMENT RELEVANT---
STEP relevanceChecker
STEP generateAnswer
STEP hallucinationChecker
Paris
