In [None]:
!pip install -q -U langchain-nomic langchain_community tiktoken langchainhub chromadb langchain langgraph tavily-python gpt4all pypdf colab-xterm gradio

# Note:

### Type in your terminal:

curl -fsSL https://ollama.com/install.sh | sh

ollama serve & ollama pull llama3

In [None]:
### LLM

local_llm = "llama3"

# URL Type Router

In [None]:
import requests

def url_type(url):
    """
    Determine the type of the URL: PDF or webpage.

    Args:
        url (str): The URL to check.

    Returns:
        str: The type of the URL: 'pdf' or 'webpage'.
    """
    # Check if the URL ends with .pdf
    if url.lower().endswith('.pdf'):
        return 'pdf'

    # Check the Content-Type and Content-Disposition headers
    try:
        response = requests.head(url, allow_redirects=True)
        content_type = response.headers.get('Content-Type', '').lower()
        content_disposition = response.headers.get('Content-Disposition', '').lower()

        # Check if Content-Type indicates a PDF
        if 'application/pdf' in content_type:
            return 'pdf'

        # Check if Content-Disposition indicates a PDF filename
        if 'filename' in content_disposition and content_disposition.endswith('.pdf'):
            return 'pdf'

        # Check if Content-Type indicates an HTML page
        if 'text/html' in content_type:
            return 'webpage'
    except requests.RequestException:
        return 'unknown'

    return 'unknown'

# Example URL to test
url = "https://arxiv.org/pdf/2403.14403"
print(f"The URL '{url}' is identified as a '{url_type(url)}'.")

# Document Class

In [None]:
import requests

class Document:
    def __init__(self, page_content, metadata):
        self.page_content = page_content
        self.metadata = metadata

    @staticmethod
    def fetch_html(url):
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
        try:
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            return response.text
        except requests.exceptions.HTTPError as err:
            print(f"HTTPError for URL {url}: {err}")
            return ""
        except Exception as err:
            print(f"Error for URL {url}: {err}")
            return ""

    @staticmethod
    def load_documents(urls):
        documents = []
        for url in urls:
            html = Document.fetch_html(url)
            documents.append(Document(page_content=html, metadata={"source": url}))
            print(f"Loaded document from URL: {url}")  # Debugging statement
        return documents

# Chunk & Embed

In [None]:
!pip install -q html2text faiss-cpu sentence-transformers

In [None]:
import requests
from bs4 import BeautifulSoup
from langchain.document_transformers import Html2TextTransformer
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter

def chunk_embed_documents(urls):
    pdf_docs_list = []
    html_docs_list = []

    for url in urls:
        if url_type(url) == 'pdf':
            docs = PyPDFLoader(url).load()
            pdf_docs_list.extend(docs)
        else:
            html_content = Document.fetch_html(url)
            if html_content:
                doc = Document(page_content=html_content, metadata={"source": url})
                html_docs_list.append(doc)
            else:
                print(f"Failed to load document from URL: {url}")

    if pdf_docs_list:
        print(f"Number of PDF documents loaded: {len(pdf_docs_list)}")
        text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=250, chunk_overlap=0)
        doc_splits = text_splitter.split_documents(pdf_docs_list)
        vectorstore = FAISS.from_documents(doc_splits, HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2'))
        retriever = vectorstore.as_retriever()
        print(f"PDF Retriever created: {retriever}")
        return retriever

    if html_docs_list:
        html2text = Html2TextTransformer()
        docs_trans = html2text.transform_documents(html_docs_list)
        print(f"Number of HTML documents loaded: {len(docs_trans)}")
        text_splitter = CharacterTextSplitter(chunk_size=100, chunk_overlap=0)
        chunked_documents = text_splitter.split_documents(docs_trans)
        db = FAISS.from_documents(chunked_documents, HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2'))
        retriever = db.as_retriever()
        print(f"HTML Retriever created: {retriever}")
        return retriever

    print("No documents to process.")
    return None

In [None]:
def load_pdf(urls):
    docs = [PyPDFLoader(url).load() for url in urls]
    docs_list = [item for sublist in docs for item in sublist]

    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=250, chunk_overlap=0
    )
    doc_splits = text_splitter.split_documents(docs_list)
    return doc_splits

In [None]:
doc_splits = load_pdf(["https://arxiv.org/pdf/2403.14403",
    "https://arxiv.org/pdf/2401.15884",
    "https://arxiv.org/pdf/2310.11511"])
# Print each document content on a new line
for i, doc in enumerate(doc_splits):
    print(f"Document {i+1}:")
    print(doc.page_content)
    print("\n" + "-"*80 + "\n")

# Agents

### Make sure to enter in terminal `ollama pull llama3` before executing agents

In [None]:
### Retrieval Grader

from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance
    of a retrieved document to a user question. If the document contains keywords related to the user question,
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()

In [None]:
### Generate

from langchain.prompts import PromptTemplate
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks.
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
    Keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question}
    Context: {context}
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

llm = ChatOllama(model=local_llm, temperature=0)


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

In [None]:
### Hallucination Grader

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Prompt
prompt = PromptTemplate(
    template=""" <|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a
    single key 'score' and no preamble or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n ------- \n
    {documents}
    \n ------- \n
    Here is the answer: {generation}  <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()

In [None]:
### Answer Grader

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n ------- \n
    {generation}
    \n ------- \n
    Here is the question: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()

In [None]:
### Router

from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)


prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an expert at routing a
    user question to a vectorstore or web search. Use the vectorstore for questions with keywords related to topics in
    the vectorstore. You do not need to be stringent with the keywords
    in the question related to these topics. Otherwise, use web-search. Give a binary choice 'web_search'
    or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and
    no premable or explanation. Question to route: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question"],
)

question_router = prompt | llm | JsonOutputParser()

# Web Search

In [None]:
### Search

import os
from langchain_community.tools.tavily_search import TavilySearchResults

# Set your Tavily API key
os.environ['TAVILY_API_KEY'] = 'YOUR_API'

# Initialize the TavilySearchResults tool
web_search_tool = TavilySearchResults(k=3)

# LangGraph Control Flow

### We'll implement these as a control flow in LangGraph.

In [None]:
from typing_extensions import TypedDict
from typing import List, Dict, Any

### State


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
        urls: list of URLs to process
    """
    question: str
    generation: str
    web_search: str
    documents: List[str]
    urls: List[str]  # Add this line
    retrievers: List[Any]  # Added retrievers to the TypedDict


def process_urls_node(state):
    """
    Process URLs and update the state with retrievers.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updated state with retrievers
    """
    urls = state.get("urls", [])
    print(f"Processing URLs: {urls}")  # Debugging statement

    retrievers = state.get("retrievers", [])

    if urls:
        retriever = chunk_embed_documents(urls)
        if retriever:
            retrievers.append(retriever)
            print(f"Retriever added")  # Debugging statement

    state["retrievers"] = retrievers  # Store retrievers in the state
    print(f"Retrievers in state: {state['retrievers']}")  # Debugging statement
    return state


def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    print(f"Retrievers in state: {state['retrievers']}")  # Debugging statement
    question = state["question"]
    retrievers = state.get("retrievers", [])
    print(f"Number of retrievers: {len(retrievers)}")  # Debugging statement
    print(retrievers)
    # Combine results from all retrievers
    documents = []

    for retriever in retrievers:
        retrieved_docs = retriever.get_relevant_documents(question)
        print(f"Documents retrieved: {retrieved_docs}")  # Debugging statement
        documents.extend(retrieved_docs)

    print(f"Total number of documents retrieved: {len(documents)}")  # Debugging statement
    print(documents)
    return {"documents": documents, "question": question}


def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score["score"]
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    return {"documents": documents, "question": question}


### Conditional edge


def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)

    source = question_router.invoke({"question": question})
    print(source)
    print(source["datasource"])

    if source["datasource"] == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source["datasource"] == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
        )
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


### Conditional edge


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score["score"]

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"


from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("process_urls_node", process_urls_node) # process URLs
workflow.add_node("websearch", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve from vector store
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generate

### Graph Build

In [None]:
# Build graph
# Set the entry point to process URLs first
workflow.set_entry_point("process_urls_node")

# Route the question after processing URLs
workflow.add_conditional_edges(
    "process_urls_node",  # Current node
    route_question,  # Function to determine the next node
    {
        "vectorstore": "retrieve",  # Mapping of condition to next node
        "websearch": "websearch"
    }
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

# Backend Inference Test

In [14]:
from pprint import pprint

# Compile the workflow
app = workflow.compile()

'''
# Test the workflow
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
'''
'''
urls = [
    "https://www.allrecipes.com/recipe/240290/moms-scalloped-potatoes/",
    "https://littlesunnykitchen.com/scalloped-potatoes/",
]
'''

pdf_urls = [
    "https://arxiv.org/pdf/2403.14403",
    "https://arxiv.org/pdf/2401.15884",
    "https://arxiv.org/pdf/2310.11511",
]

inputs = {
    #"question": "What is agent memory?",
    #"question": "How do I make scalloped potatoes?",
    "question": "Tell me about: 1) Adaptive-RAG, 2) Self-Reflective Retrieval-Augmented Generation, and 3) Corrective Retrieval Augmented Generation",
    "urls": pdf_urls,
    "retrievers": [],
    "generation": "",
    "web_search": "No",
    "documents": [],
}

for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
        # Print the value dictionary to see all keys
        pprint(value)
        # Check if 'generation' key exists before printing it
        if 'generation' in value:
            pprint(value['generation'])
        else:
            pprint("No generation found yet")

Processing URLs: ['https://arxiv.org/pdf/2403.14403', 'https://arxiv.org/pdf/2401.15884', 'https://arxiv.org/pdf/2310.11511']
Number of PDF documents loaded: 59




PDF Retriever created: tags=['FAISS', 'HuggingFaceEmbeddings'] vectorstore=<langchain_community.vectorstores.faiss.FAISS object at 0x7f8271023bb0>
Retriever added
Retrievers in state: [VectorStoreRetriever(tags=['FAISS', 'HuggingFaceEmbeddings'], vectorstore=<langchain_community.vectorstores.faiss.FAISS object at 0x7f8271023bb0>)]
---ROUTE QUESTION---
Tell me about: 1) Adaptive-RAG, 2) Self-Reflective Retrieval-Augmented Generation, and 3) Corrective Retrieval Augmented Generation
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
'Finished running: process_urls_node:'
{'documents': [],
 'generation': '',
 'question': 'Tell me about: 1) Adaptive-RAG, 2) Self-Reflective '
             'Retrieval-Augmented Generation, and 3) Corrective Retrieval '
             'Augmented Generation',
 'retrievers': [VectorStoreRetriever(tags=['FAISS', 'HuggingFaceEmbeddings'], vectorstore=<langchain_community.vectorstores.faiss.FAISS object at 0x7f8271023bb0>)],
 'urls': ['https://arxi

  warn_deprecated(


---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
{'documents': [Document(page_content='Self-CRAG dropped as the retrieval performance\ndropped, indicating that the generator relied heavily\non the quality of the retriever. Furthermore, as\nthe retrieval performance dropped, the generation\nperformance of Self-CRAG dropped more slightly\nthan that of Self-RAG. These results imply the\nsuperiority of Self-CRAG over Self-RAG on en-\nhancing the robustness to retrieval performance.\n6 Conclusion\nThis paper studies the problem where RAG-based\napproaches are challenged if retrieval goes wrong,\nthereby exposing inaccurate and misleading knowl-\nedge to generative LMs. Corrective Retrieval\nAugmented Generation is proposed to improve the\nrobustness of generation. Essentially, a lightweight\nretrieval evaluator is to estimate 

# Frontend Inference Test

"https://arxiv.org/pdf/2403.14403",
"https://arxiv.org/pdf/2401.15884",
"https://arxiv.org/pdf/2310.11511",
"https://www.allrecipes.com/recipe/240290/moms-scalloped-potatoes/",

How can I integrate SELF-RAG, Adaptive-RAG, and CRAG together?

In [None]:
import gradio as gr
from pprint import pprint
import io
import contextlib

# Compile the workflow
try:
    app = workflow.compile()
except ValueError as e:
    print(f"Error compiling workflow: {e}")

def process_inputs(question, urls):
    # Ensure URLs are split correctly
    url_list = [url.strip() for url in urls.replace(",", " ").split()]

    # Prepare the input state for the workflow
    inputs = {
        "question": question,
        "urls": url_list,
        "retrievers": [],
        "generation": "",
        "web_search": "No",
        "documents": []
    }

    final_output = None

    # Create a string buffer to capture the printed output
    with io.StringIO() as buf, contextlib.redirect_stdout(buf):
        try:
            for output in app.stream(inputs):
                for key, value in output.items():
                    pprint(f"Finished running: {key}:")
                    final_output = value.get("generation", "No generation found")

            # Capture the printed output
            pprint(final_output)
            output_string = buf.getvalue()
        except Exception as e:
            print(f"Error processing inputs: {e}")
            output_string = str(e)

    return output_string

# Define the Gradio interface
iface = gr.Interface(
    fn=process_inputs,
    inputs=["text", "text"],
    outputs="text",
    title="Advanced RAG UI",
    description=("Ask a question and enter relevant URLs, each URL separated by a comma."
                " Outputs an answer using a combination of vector store retrieval and web search")
)

iface.launch(share=True)