In [1]:
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_postgres.vectorstores import PGVector
from langchain.vectorstores import Qdrant
from langchain_qdrant import QdrantVectorStore
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from semantic_router import Route
from semantic_router.encoders import HuggingFaceEncoder
from semantic_router.layer import RouteLayer
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader
import os

In [2]:
os.environ["TAVILY_API_KEY"] = "api_key"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = "api_key"

In [3]:
embeddings = HuggingFaceEmbeddings(
            model_name="BAAI/bge-large-en-v1.5",
            model_kwargs = {'device': 'mps'})

  warn_deprecated(
  from pandas.core import (


In [4]:
loader = DirectoryLoader('./data/pg/', glob="./*.txt")

documents = loader.load()


In [5]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=20)
docs = text_splitter.split_documents(documents)

In [6]:
docs[:5]

[Document(page_content='# API Documentation ## Authentication To authenticate using the API, you need to include your API key in the request headers. The API key should be provided to you when you create an account. Example: ``` GET /api/v1/resource Headers: Authorization: Bearer YOUR_API_KEY ```', metadata={'source': 'data/pg/software_development_data.txt'}),
 Document(page_content='## Endpoints ### getUser The `getUser` endpoint retrieves user information based on the user ID. Required parameters: - user_id (string): The unique identifier for the user. Example: ``` GET /api/v1/user/{user_id} Headers: Authorization: Bearer YOUR_API_KEY ```', metadata={'source': 'data/pg/software_development_data.txt'}),
 Document(page_content='### createUser The `createUser` endpoint creates a new user in the system. Required parameters: - name (string): The name of the user. - email (string): The email address of the user. Example: ``` POST /api/v1/user Headers: Authorization: Bearer YOUR_API_KEY Bod

In [7]:
connection = "postgresql://url_here"  # Uses psycopg3!
collection_name = "medium-pg-data"
vector_store = PGVector(
        connection=connection, 
        collection_name=collection_name, 
        embeddings=embeddings
    )

In [16]:
vector_store.add_documents(docs)

['87393379-ff6e-4311-8adf-00a1bb675db2',
 'ed9bd8dc-0144-46ac-85d6-c5359a4686d9',
 'd5a5e92a-6d9a-43ab-9cbf-a5e7682105cb',
 '946b26ce-74ef-4cb8-9f15-4cbbee75eaac',
 '65032de7-00ad-4292-8ee2-13e00eccfbc8',
 '7e0c7cc8-bd9b-408d-80cc-c41bc9c7d665',
 '7b2c8640-f515-480c-9d00-86947bbc396e']

In [8]:
pg_retriever = vector_store.as_retriever(search_kwargs={"score_threshold": 0.5,"k": 1})
pg_docs = pg_retriever.invoke("How do I authenticate using the API?")
pg_docs

[Document(page_content='# API Documentation ## Authentication To authenticate using the API, you need to include your API key in the request headers. The API key should be provided to you when you create an account. Example: ``` GET /api/v1/resource Headers: Authorization: Bearer YOUR_API_KEY ```', metadata={'source': '/Users/fashah/Downloads/medium-test-data/pg/software_development_data.txt'})]

In [18]:
loader = DirectoryLoader('./data/qdrnt/', glob="./*.txt")

documents = loader.load()

In [19]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=20)
qd_docs = text_splitter.split_documents(documents)

In [20]:
qdrant_vector_store = QdrantVectorStore.from_documents(
    qd_docs,
    embeddings,
    location=":memory:",  # Local mode with in-memory storage only
    collection_name="medium-qdrnt-data",
)


In [21]:
docs[:2]

[Document(page_content='# Return Policy Our return policy allows you to return products within 30 days of purchase. The items must be in their original condition and packaging. To initiate a return, please contact our customer service team with your order details.\n\n## Return Process 1. Contact customer service to request a return authorization. 2. Pack the items securely in the original packaging. 3. Include the return authorization slip in the package. 4. Ship the package to the provided address.', metadata={'source': 'data/qdrnt/customer_service_faqs.txt'}),
 Document(page_content='## Refund Policy Refunds will be processed within 7-10 business days after we receive the returned items. The refund will be issued to the original payment method.\n\n# Shipping Information We offer several shipping options to meet your needs. Shipping times and costs vary based on the selected option and your location.\n\n## Shipping Options\n\n1. *\n\nStandard Shipping*\n\n: 5\n\n7 business days.\n\n2.

In [22]:
qdrnt_retriever = qdrant_vector_store.as_retriever(search_kwargs={"score_threshold": 0.5,"k": 1})
qd_docs = qdrnt_retriever.invoke("What is your return policy?")
qd_docs

[Document(page_content='# Return Policy Our return policy allows you to return products within 30 days of purchase. The items must be in their original condition and packaging. To initiate a return, please contact our customer service team with your order details.\n\n## Return Process 1. Contact customer service to request a return authorization. 2. Pack the items securely in the original packaging. 3. Include the return authorization slip in the package. 4. Ship the package to the provided address.', metadata={'source': 'data/qdrnt/customer_service_faqs.txt', '_id': '0d8f58a1e232411e893e44382f537f8c', '_collection_name': 'medium-qdrnt-data'})]

In [23]:
local_llm = "llama3"
llm = ChatOllama(model=local_llm, format="json", temperature=0)


In [24]:
tech_doc_route = Route(
    name="tech_doc",
    utterances=[
        "How do I authenticate using the API?",
        "What are the required parameters for the getUser endpoint?",
        "Can you provide an example of a POST request in Python?",
        "How do I set up a development environment?",
        "What is the usage of the createUser endpoint?",
        "How do I install Python on my machine?",
    ],
)

In [25]:
customer_service_route = Route(
    name="customer_service",
    utterances=[
        "What is your return policy?",
        "How long does shipping take?",
        "How do I reset my password?",
        "How do I track my order?",
        "How do I contact customer service?",
        "How do I update my account information?",
    ],
)


In [26]:
routes = [tech_doc_route,customer_service_route]

In [27]:
encoder = HuggingFaceEncoder(name="BAAI/bge-large-en-v1.5")

In [28]:
rl = RouteLayer(encoder=encoder, routes=routes)

In [29]:
rl("What is the process to log in using the API?")

RouteChoice(name='tech_doc', function_call=None, similarity_score=None)

In [30]:
### Retrieval Grader

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance 
    of a retrieved document to a user question. If the document contains keywords related to the user question, 
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()

In [31]:
### Hallucination Grader

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Prompt
prompt = PromptTemplate(
    template=""" <|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether 
    an answer is grounded in / supported by a set of facts.Give a binary 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and NO preamble or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}  <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()

In [32]:
### Answer Grader

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an 
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()

In [33]:
#generate answer
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a helpful chatbot that can answer questions based on the provided context. 
    You need not use the entire context provided to you.
    When interpreting general questions, you can rephrase the content without altering its meaning.
    For questions that require steps, processes, or procedures, maintain the original content as closely as possible.
    If the context includes Red Hat specific knowledge, include that in your answer.
    Avoid repeating answers.<|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question} 
    Context: {context} 
    ### If the question demands steps or a list of points, guide the model to respond accordingly:
    - Please provide the answer in points or a list format where appropriate.
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

llm = ChatOllama(model=local_llm, temperature=0)


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

In [34]:
### Search
web_search_tool = TavilySearchResults(k=3)

In [35]:
#create graph functions
from pprint import pprint
from typing import List, Dict, Any

from langchain_core.documents import Document
from typing_extensions import TypedDict

from langgraph.graph import END, StateGraph



class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]
    retrieval_model: Any
    


### Nodes
def tech_retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = pg_retriever.invoke(question)
    return {"documents": documents, "question": question, "retrieval_model":pg_retriever}

def service_retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = qdrnt_retriever.invoke(question)
    return {"documents": documents, "question": question, "retrieval_model":qdrnt_retriever }

def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    return {"documents": documents, "question": question}

def grade_documents(state):
    
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    
    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score["score"]
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}

def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}



## Conditional edge


def route_question(state):
    """
    Route question to RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """
    print("---ROUTE QUESTION---")
    question = state["question"]
    retrieval_model = state["retrieval_model"]
    print("retrieval_model",retrieval_model)
    print(question)
    source = rl(question).name
    print(source)

    if source == "tech_doc":
        print("---ROUTE QUESTION TO software rag---")
        return "tech"
    elif source == "customer_service":
        print("---ROUTE QUESTION TO customer service RAG---")
        return "service"
    else:
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
        

def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    web_search = state["web_search"]
    state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
        )
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

### Conditional edge


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """
    # Initialize custom dictionary if it does not exist

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    
    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    print("score",score)
    grade = score["score"]

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"




In [36]:
workflow = StateGraph(GraphState)
workflow.add_node("websearch", web_search)
workflow.add_node("tech_retrieve", tech_retrieve)  # retrieve
workflow.add_node("service_retrieve", service_retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae

In [37]:
#graph build
workflow.set_conditional_entry_point(
    route_question,
    {
        "tech": "tech_retrieve",
        "service": "service_retrieve",
        "websearch": "websearch"
    },
)


workflow.add_edge("tech_retrieve", "grade_documents")
workflow.add_edge("service_retrieve", "grade_documents")

workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")

workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

In [38]:
# Compile
app = workflow.compile()

# Test

inputs = {"question": "How can I check the status of my shipment?","retrieval_model":""}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
retrieval_model 
How can I check the status of my shipment?
customer_service
---ROUTE QUESTION TO customer service RAG---
---RETRIEVE---
'Finished running: service_retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
---GENERATE---
---CHECK HALLUCINATIONS---
score {'score': 'yes'}
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('To check the status of your shipment:\n'
 '\n'
 '• Once your order has shipped, you will receive a tracking number via '
 'email.\n'
 '• You can use this tracking number to track the status of your shipment on '
 'our website.')


In [75]:
app = workflow.compile()

# Test

inputs = {"question": "How do I download and set up Python?","retrieval_model":""}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
retrieval_model 
How do I download and set up Python?
tech_doc
---ROUTE QUESTION TO software rag---
---RETRIEVE---
'Finished running: tech_retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
---GENERATE---
---CHECK HALLUCINATIONS---
score {'score': 'yes'}
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('To download and set up Python, follow these steps:\n'
 '\n'
 '1. **Install Python**:\n'
 '\t* Download and install Python from the official website: '
 'https://www.python.org/downloads/\n'
 '2. **Verify the installation**: Once installed, open a command prompt or '
 'terminal window and type `python --version` to verify that Python is '
 'installed correctly.\n'
 '\n'
 "That's it! You now have Python set up on your system.")


In [39]:
app = workflow.compile()

# Test

inputs = {"question": "What is capital of inida?","retrieval_model":""}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
retrieval_model 
What is capital of inida?
None
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---
'Finished running: websearch:'
---GENERATE---
---CHECK HALLUCINATIONS---
score {'score': 'yes'}
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
'What is the capital of India?\n\nThe capital of India is New Delhi.'
