# Adaptive RAG -- Dev 0.1

## Goal: 
- Create Query analysis to redirect question to Rag or web-search

## Tools used:
- OpenAI Embeddings
- ChatOpenAI
- Pinecone

The reason for these tools is to get the system running efficiently online

Running embeddings locally will make development taking longer than needed

In [9]:
# dotenv
import os
import dotenv
dotenv.load_dotenv()

True

In [5]:
llm_model = "gpt-3.5-turbo"

## Embedding Documents

Documents are loaded from AIM Website (for now). We will utilize Langchains feature to parse and chunk HTML sites.

### Indexing

In [4]:
### Build Index

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# set embeddings
embd = OpenAIEmbeddings()

Load documents from AIM website into notebook

In [5]:
import requests
from bs4 import BeautifulSoup
import re

base_url = "https://www.faa.gov/air_traffic/publications/atpubs/aim_html/"

response = requests.get(base_url)
soup = BeautifulSoup(response.content, "html.parser")

links = soup.find_all("a", href=True)
subpages = set()

for link in links:
        href = link['href']
        if re.search(r'(chap|appendix)', href, re.IGNORECASE):  # Adjust regex to match 'chapter' or 'appendix'
                full_url = f"{base_url.rstrip('/')}/{href.lstrip('/')}"
                subpages.add(full_url)

subpages

{'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./appendix_1.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./appendix_2.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./appendix_3.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./appendix_4.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./appendix_5.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./chap0_cfr.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./chap0_chap0_policy.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./chap0_faa_desc.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./chap0_info_eoc.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./chap0_subscription_info.html',
 'https://www.faa.gov/air_traffic/publications/atpubs/aim_html/./chap10_section_1.html',
 'https://www.faa.gov/air_traffic/publications/atpubs

create pinecone index

In [6]:
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))

index_name = "adaptive-rag"

pc.create_index(
    name=index_name,
    dimension=1536,
    metric="cosine",
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1"
    )
)

  from tqdm.autonotebook import tqdm


PineconeApiException: (409)
Reason: Conflict
HTTP response headers: HTTPHeaderDict({'content-type': 'text/plain; charset=utf-8', 'x-pinecone-api-version': '2024-04', 'access-control-allow-origin': '*', 'vary': 'origin,access-control-request-method,access-control-request-headers', 'access-control-expose-headers': '*', 'X-Cloud-Trace-Context': 'eba641068fb806805699b36644b4fd4c', 'Date': 'Tue, 11 Jun 2024 02:48:45 GMT', 'Server': 'Google Frontend', 'Content-Length': '85', 'Via': '1.1 google', 'Alt-Svc': 'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000'})
HTTP response body: {"error":{"code":"ALREADY_EXISTS","message":"Resource  already exists"},"status":409}


In [7]:

# Load docs

from langchain_pinecone import PineconeVectorStore

docs = [WebBaseLoader(url).load() for url in subpages] # pulls pages from subpages set
docs_list = [item for sublist in docs for item in sublist] # flattens the list

# Split docs
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500, chunk_overlap=0,
)
doc_splits = text_splitter.split_documents(docs_list)

In [None]:
# Initializes vector db AND add docs

# Initialize vectorstore and add documents simultaneously
vectorstore_from_docs = PineconeVectorStore.from_documents(
    doc_splits,
    index_name=index_name,
    embedding=embd,
)

Innitializes vector db in notebook

In [10]:
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))

index_name = "adaptive-rag"

vectorstore_from_docs = PineconeVectorStore(index_name=index_name, embedding=embd)

In [11]:
retriever = vectorstore_from_docs.as_retriever()

To add more reccords

Once initialized, you can add more documents to the index with `add_documents` or `add_texts`

## LLMs

In [13]:
### Router

from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

# Data model
class RouteQuery(BaseModel):
    """Route a user query to the most relevant datasource"""

    datasource: Literal["vectorstore", "web_search"] = Field(
        ...,
        description="Given a user question choose to route it to web search or a vectorstore."
    )

# LLM with function call
llm = ChatOpenAI(model=llm_model, temperature=0)
structured_llm_router = llm.with_structured_output(RouteQuery)

# Prompt
system = """You are an expert at routing a user question to a vectorstore or web search.
The vectorstore contains documents related to agents, prompt engineering, and adversarial attacks.
Use the vectorstore for questions on these topics. Otherwise, use web-search."""
route_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)

user_question = "what is an instrument approach?"

question_router = route_prompt | structured_llm_router
print(
    question_router.invoke(
        {"question": user_question}
    )
)

print(question_router.invoke({"question": "whats machine learning?"}))

datasource='vectorstore'
datasource='web_search'


In [14]:
### Retrieval Grader

# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes', or 'no'"
    )

# LLM with function call
llm = ChatOpenAI(model=llm_model, temperature=0)
structured_llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question"""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "ILS"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

binary_score='yes'


In [15]:
### Generate

from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = hub.pull("rlm/rag-prompt")

# LLM
llm = ChatOpenAI(model_name=llm_model, temperature=0)

# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

ILS stands for Instrument Landing System, which provides course guidance to the runway centerline using signals transmitted by the localizer. The localizer operates on one of 40 ILS channels within a specific frequency range. The ILS system is not in service simultaneously on both ends of a runway where it is installed.


In [16]:
### Hallucination Grader

# Data model
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""

    binary_score: str = Field(
        description="Answer is grounded in the facts, 'yes', or 'no'"
    )

# LLM with function call
llm = ChatOpenAI(model=llm_model, temperature=0)
structured_llm_grader = llm.with_structured_output(GradeHallucinations)

# Prompt
system = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
    ]
)

hallucination_grader = hallucination_prompt | structured_llm_grader
hallucination_grader.invoke({"documents": docs, "generation": generation})

GradeHallucinations(binary_score='yes')

In [17]:
### Answer Grader

# Data model
class GradeAnswer(BaseModel): 
    """Binary score to assess answer addresses question."""

    binary_score: str = Field(
        description="Answer addresses the question, 'yes', or 'no'"
    )

# LLM with funciton call
llm = ChatOpenAI(model=llm_model, temperature=0)
structured_llm_grader = llm.with_structured_output(GradeAnswer)

# Prompt
system = """You are a grader assessing whether an answer addresses / resolves a question \n 
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {generation}"),
    ]
)

answer_grader = answer_prompt | structured_llm_grader
answer_grader.invoke({"question": question, "generation": generation})

GradeAnswer(binary_score='yes')

In [18]:
### Question Re-writer

# LLM
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)

# Prompt
system = """You a question re-writer that converts an input question to a better version that is optimized \n 
     for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})


'What is the meaning of ILS?'

Web Search Tool

In [19]:
### Search

from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

Graph

capture the flow in as a graph

Graph State


In [20]:
from typing_extensions import TypedDict
from typing import List

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

question: str
generation: str
documents: List[str]

Graph Flow


In [21]:
from langchain.schema import Document

def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns: 
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    try:
        documents = retriever.invoke(question)
    except Exception as e:
        print(f"error during retrieval: {e}")
        documents = []
    return {"documents": documents, "question": question}

def generate(state): 
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation    
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    try: 
        generation = rag_chain.invoke({"context": documents, "question": question})
    except:
        print(f"error during generation: {e}")
        generation = ""
    return {"documents": documents, "question": question, "generation": generation}

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question. 

    Args:
        state (dict): The current graph state

    Returns: 
        state (dict): Updates documents key with only filtered relevant documents
    """
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for d in documents:
        try: 
            score = retrieval_grader.invoke(
                {"question": question, "document": d.page_content}
            )
            grade = score.binary_score
        except Exception as e:
            print(f"error during document grading: {e}")
            continue
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}

def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns: 
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    try: 
        better_question = question_rewriter.invoke({"question": question})
    except Exception as e:
        print(f"error during transform query: {e}")
        better_question = question
    return {"documents": documents, "question": better_question}

def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("---WEB SEARCH---")
    question = state["question"]

    # Web search
    try: 
        docs = web_search_tool.invoke({"query": question})
        web_results = "\n".join([d["content"] for d in docs])
        web_results = Document(page_content=web_results)
    except Exception as e:
        print(f"error in web search: {e}")
        web_results = Document(page_content="")
    return {"documents": [web_results], "question": question}


### Edges ###

def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    try:
        source = question_router.invoke({"question": question})
    except Exception as e:
            print(f"error during question routing: {e}")
            return "error"
    if source.datasource == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "web_search"
    elif source.datasource == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        try: 
            score = answer_grader.invoke({"question": question, "generation": generation})
            grade = score.binary_score
        except Exception as e: 
            print(f"error during answer grading {e}")
            return "not useful"
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else: 
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"


Build Graph

In [22]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("web_search", web_search)             # web search
workflow.add_node("retrieve", retrieve)                 # retrieve
workflow.add_node("grade_documents", grade_documents)   # grade documents
workflow.add_node("generate", generate)                 # generate
workflow.add_node("transform_query", transform_query)   # transform query

# build graph
workflow.set_conditional_entry_point(                   # set beginning point
    route_question,
    {
        "web_search": "web_search",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("web_search", "generate")             # web-search node
workflow.add_edge("retrieve", "grade_documents")        # retrieval node
workflow.add_conditional_edges(                         # conditional edge points to right direction
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")        # retrieval node
workflow.add_conditional_edges(                         # conditional edge points to right direction
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)

# compile
app = workflow.compile()

In [23]:
from pprint import pprint

# run
inputs = {
    "question": "what is an ils approach?"
}

for output in app.stream(inputs):
    for key, value in output.items():
        # node
        pprint(f"Node '{key}':")
    pprint("\n---\n")

# final generation
pprint(value["generation"])

InvalidUpdateError: Must write to at least one of []