In [22]:
import os
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from pprint import pprint
from typing import List

# Load and set environment variables from .env file
load_dotenv()
os.environ.update({
    'LANGCHAIN_TRACING_V2': 'true',
    'LANGCHAIN_ENDPOINT': 'https://api.smith.langchain.com',
    'LANGCHAIN_PROJECT': 'CRAG-LangGraph'
})

# Define the URLs of the documents to load
urls = [
    "https://div.beehiiv.com/p/advanced-rag-series-indexing",
    "https://div.beehiiv.com/p/advanced-rag-series-retrieval",
    "https://div.beehiiv.com/p/advanced-rag-series-generation-evaluation",
]

# Load documents concurrently
def load_document(url):
    try:
        return WebBaseLoader(url).load()
    except Exception as e:
        print(f"Failed to load {url}: {e}")
        return []

with ThreadPoolExecutor() as executor:
    docs = list(executor.map(load_document, urls))

# Flatten the list of documents
docs_list = [item for sublist in docs if sublist for item in sublist]

# Create and configure a text splitter for chunking the documents
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=500, chunk_overlap=50)

# Split the documents into chunks
doc_splits = text_splitter.split_documents(docs_list)

# Initialize and create a Chroma vector store from the document chunks
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=OpenAIEmbeddings(),
)

# Initialize retriever from vector store
retriever = vectorstore.as_retriever()

### Retrieval Grader

# Data model for grading
class GradeDocuments(BaseModel):
    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")

# Create language model instance
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt template
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader

### Generate

# Retrieve prompt from hub
prompt = hub.pull("tomas-herman/rag-prompt")

# Combine prompt with LLM and output parser
rag_chain = prompt | llm | StrOutputParser()

### Question Re-writer

# Re-writing prompt
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """You are a question re-writer that converts an input question to a better version that is optimized \n
                      for web search. Look at the input and try to reason about the underlying semantic intent / meaning."""),
        ("human", "Here is the initial question: \n\n {question} \n Formulate an improved question."),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()

### Search

from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(max_results=3)

### Graph

from langgraph.graph import END, StateGraph
from typing_extensions import TypedDict
from langchain.schema import Document

class GraphState(TypedDict):
    question: str
    generation: str
    web_search: str
    documents: List[str]

# Define nodes
def retrieve(state):
    print("---RETRIEVE---")
    documents = retriever.get_relevant_documents(state["question"])
    return {"documents": documents, "question": state["question"]}

def grade_documents(state):
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    filtered_docs = []
    web_search = "No"
    for d in state["documents"]:
        score = retrieval_grader.invoke({"question": state["question"], "document": d.page_content})
        if score.binary_score == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": state["question"], "web_search": web_search}

def transform_query(state):
    print("---TRANSFORM QUERY---")
    better_question = question_rewriter.invoke({"question": state["question"]})
    return {"documents": state["documents"], "question": better_question}

def web_search(state):
    print("---WEB SEARCH---")
    docs = web_search_tool.invoke({"query": state["question"]})
    state["documents"].append(Document(page_content="\n".join([d["content"] for d in docs])))
    return {"documents": state["documents"], "question": state["question"]}

def generate(state):
    print("---GENERATE---")
    generation = rag_chain.invoke({"context": state["documents"], "question": state["question"]})
    return {"documents": state["documents"], "question": state["question"], "generation": generation}

def decide_to_generate(state):
    print("---ASSESS GRADED DOCUMENTS---")
    if state["web_search"] == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---")
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

# Construct and compile the graph
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("transform_query", transform_query)
workflow.add_node("web_search_node", web_search)
workflow.add_node("generate", generate)

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges("grade_documents", decide_to_generate, {"transform_query": "transform_query", "generate": "generate"})
workflow.add_edge("transform_query", "web_search_node")
workflow.add_edge("web_search_node", "generate")
workflow.add_edge("generate", END)

# Compile
app = workflow.compile()


In [24]:
# Run
inputs = {"question": "What is the best strategy for document indexing for RAG?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}':")
    pprint("\n---\n")
pprint(value["generation"])


---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
"Node 'grade_documents':"
'\n---\n'
---ASSESS GRADED DOCUMENTS---
---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---
---TRANSFORM QUERY---
"Node 'transform_query':"
'\n---\n'
---WEB SEARCH---
"Node 'web_search_node':"
'\n---\n'
---GENERATE---
"Node 'generate':"
'\n---\n'
"Node '__end__':"
'\n---\n'
('Effective document indexing strategies for RAG implementation include using '
 'the lightweight "Retrieval Evaluator" to return confidence scores for each '
 'document retrieved, which then determine the retrieval action to trigger. '
 'Corrective RAG (CRAG) enhances generation by labeling retrieved documents '
 'into categories like Correct, Ambiguous, or Incorrect based on confidence '
 'scores, triggering appropriate actions. Another strategy is t

In [25]:
# Run
inputs = {"question": "What is rag in ai?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}':")
    pprint("\n---\n")
pprint(value["generation"])


---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
"Node 'grade_documents':"
'\n---\n'
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
---GENERATE---
"Node 'generate':"
'\n---\n'
"Node '__end__':"
'\n---\n'
('RAG in AI stands for Retrieval-Augmented Generation. It is a pipeline that '
 'involves indexing, retrieval, generation, and evaluation steps to produce '
 'high-quality outputs. The evaluation step is crucial in assessing the '
 'performance of the RAG system using various metrics such as faithfulness, '
 'answer relevance, and context precision.')


In [26]:
# Run
inputs = {"question": "What is RAPTOR for rag?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Node '{key}':")
    pprint("\n---\n")
pprint(value["generation"])


---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
"Node 'grade_documents':"
'\n---\n'
---ASSESS GRADED DOCUMENTS---
---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---
---TRANSFORM QUERY---
"Node 'transform_query':"
'\n---\n'
---WEB SEARCH---
"Node 'web_search_node':"
'\n---\n'
---GENERATE---
"Node 'generate':"
'\n---\n'
"Node '__end__':"
'\n---\n'
('The purpose of using RAPTOR for RAG is to enhance retrieval-augmented '
 'language models by allowing them to better adapt to changes in the world '
 'state and incorporate long-tail knowledge. Existing RAG methodologies often '
 'only retrieve short, contiguous chunks from a retrieval corpus, limiting the '
 "holistic understanding of the document's overall context. RAPTOR allows for "
 'more advanced usage and additional features to be prov