Classic RAG (Dense Retrieval)

Plaintext:

Q: What are the library opening hours?
A: The library is open from 9am to 8pm on weekdays.

Q: How do I obtain my student ID?
A: Student IDs are issued at the administration desk.

In [17]:
!pip install langchain-community




[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [18]:
!pip install faiss-cpu




[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [19]:
import requests
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings.base import Embeddings
from langchain.llms.base import LLM
from typing import List, Any

# docs: List of tuples containing question-answer pairs.
# Each tuple represents a document with a question and its corresponding answer.
docs = [
    ("What are the library opening hours?", "The library is open from 9am to 8pm on weekdays."),
    ("How do I obtain my student ID?", "Student IDs are issued at the administration desk."),
]

# texts: List of formatted strings combining questions and answers.
# Each string is formatted as "Q: <question>\nA: <answer>" for embedding and retrieval.
texts = [f"Q: {q}\nA: {a}" for q, a in docs]

# CustomAPIEmbeddings: Custom embedding class using your /embedding API.
# - Inherits from LangChain's Embeddings base class.
# - embed_documents() embeds a list of texts by calling embed_query() for each.
# - embed_query() sends a POST request to your /embedding endpoint with the text and model version.
# - The API returns a dense vector (embedding) for the input text.
# - Example: The text "Q: What are the library opening hours?\nA: ..." is converted into a high-dimensional vector (e.g., 1536 dimensions).
class CustomAPIEmbeddings(Embeddings):
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        return [self.embed_query(text) for text in texts]

    def embed_query(self, text: str) -> List[float]:
        response = requests.post(
            "http://localhost:8000/embedding",
            json={"input": text, "model_version": "text-embedding-ada-002"},
            timeout=10,
        )
        response.raise_for_status()
        return response.json()["embedding"]

# CustomAPILLM: Custom LLM class using your /chat API.
# - Inherits from LangChain's LLM base class.
# - _call() sends a POST request to your /chat endpoint with the system prompt, user prompt, and model version.
# - The API returns a generated answer in the "content" field.
# - Example: When a user asks "When can I go to the library?", the system retrieves the most relevant Q&A pair and uses the LLM to answer in context.
class CustomAPILLM(LLM):
    def _call(self, prompt: str, stop: List[str] = [], **kwargs: Any) -> str:
        payload = {
            "system_prompt": "You are a helpful assistant.",
            "user_prompt": prompt,
            "model_version": "gpt-4.1-2025-04-14"
        }
        response = requests.post(
            "http://localhost:8000/chat",
            json=payload,
            timeout=30,
        )
        response.raise_for_status()
        return response.json()["content"]

    @property
    def _llm_type(self) -> str:
        return "custom_api_llm"

# vectorstore: FAISS vector store instance.
# - FAISS (Facebook AI Similarity Search) is a library for efficient similarity search and clustering of dense vectors.
# - It is commonly used to quickly find similar items (e.g., documents, images) in large datasets.
# - from_texts() creates a vector store from the provided texts using the specified embedding model.
#   Example: If you have 1000 FAQ pairs, each will be converted to a vector and stored in FAISS for fast retrieval.
# - CustomAPIEmbeddings generates dense vector representations (embeddings) for each text using your custom embedding API.
# - Indexes: FAISS builds an index over these vectors, allowing fast nearest neighbor search. This index is not a traditional database index, but a structure (like an inverted file or HNSW graph) optimized for vector similarity.
# - Vector dimensions: Each embedding is a list (array) of numbers (floats), where the length (dimension) depends on the embedding model (e.g., 1536 for ada-002). Higher dimensions can capture more semantic information.
# - Why arrays? Vectors are arrays because mathematical operations (like dot product or cosine similarity) are performed on them to measure similarity.
vectorstore = FAISS.from_texts(texts, CustomAPIEmbeddings())

# qa: RetrievalQA chain instance.
# - RetrievalQA is a LangChain chain that combines a retriever and a language model (LLM) for question answering.
# - from_chain_type() initializes the chain with:
#     - llm: The language model to generate answers (here, your custom LLM).
#     - retriever: The retriever interface from the vector store, used to fetch relevant documents.
# - Example: When a user asks "When can I go to the library?", the retriever converts the question to a vector, finds the most similar vectors (documents) in FAISS, and passes them to the LLM to generate a final answer.
qa = RetrievalQA.from_chain_type(
    llm=CustomAPILLM(), retriever=vectorstore.as_retriever()
)

# Run the QA chain with a user query.
# - qa.run() takes a question as input, retrieves relevant documents, and generates an answer using the LLM.
# - Example: If the input is "When can I go to the library?", the system retrieves the most relevant Q&A pair and uses the LLM to answer in context.
print(qa.run("When can I go to the library?"))

You can go to the library on weekdays between 9am and 8pm.


Hybrid RAG (Dense + Sparse)

A Hybrid RAG system combines dense (semantic/vector) retrieval and sparse (keyword/BM25) retrieval and merges or reranks their results for improved relevance and robustness.

LangChain for orchestration

sentence-transformers for dense retrieval via vector search (using Chroma)

rank_bm25 for sparse retrieval (BM25, pure Python!)

In [20]:
!pip install langchain-community faiss-cpu sentence-transformers rank_bm25




[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
from langchain.vectorstores import FAISS
from langchain.embeddings.base import Embeddings
from langchain.llms.base import LLM
from rank_bm25 import BM25Okapi
from typing import List, Any
import requests

# 1. Example data (can be replaced with your own)
docs = [
    "The library is open from 9am to 8pm on weekdays.",
    "Student IDs are issued at the administration desk.",
    "To reset the device, press and hold the power button for 10 seconds.",
    "If the printer jams, open the rear door and remove the stuck paper.",
]

# 2. Prepare BM25 (sparse retrieval)
# Tokenize each document for BM25, which is a keyword-based retrieval algorithm.
doc_tokens = [doc.lower().split() for doc in docs]
bm25 = BM25Okapi(doc_tokens)

class CustomAPIEmbeddings(Embeddings):
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        # Calls embed_query for each text (one-by-one)
        return [self.embed_query(text) for text in texts]

    def embed_query(self, text: str) -> List[float]:
        response = requests.post(
            "http://localhost:8000/embedding",
            json={"input": text, "model_version": "text-embedding-ada-002"},
            timeout=10,
        )
        response.raise_for_status()
        return response.json()["embedding"]

class CustomAPILLM(LLM):
    """
    Custom LLM class that uses a remote API to generate answers.
    This replaces local LLMs with a call to your own /chat endpoint.
    """
    def _call(self, prompt: str, stop: List[str] = None, **kwargs: Any) -> str:
        """
        Generate a response from the custom LLM API.
        Args:
            prompt: The prompt string to send to the LLM.
            stop: Optional stop tokens (not used here).
        Returns:
            The generated answer as a string.
        """
        payload = {
            "system_prompt": "You are a helpful assistant.",
            "user_prompt": prompt,
            "model_version": "gpt-4.1-2025-04-14"
        }
        response = requests.post(
            "http://localhost:8000/chat",
            json=payload,
            timeout=30,
        )
        response.raise_for_status()
        return response.json()["content"]

    @property
    def _llm_type(self) -> str:
        return "custom_api_llm"

# 3. Prepare FAISS (dense retrieval) with custom embeddings
# FAISS is a vector database for fast similarity search over dense vectors.
# Here, we use our custom embedding API to convert docs to vectors.
embeddings = CustomAPIEmbeddings()
vectorstore = FAISS.from_texts(docs, embeddings)

def hybrid_retrieve(query: str, top_n: int = 2) -> List[str]:
    """
    Retrieve top-N relevant documents using both sparse (BM25) and dense (FAISS) retrieval,
    then merge results without duplicates.
    Args:
        query: The user query string.
        top_n: Number of top results to retrieve from each retriever.
    Returns:
        List of unique relevant documents.
    """
    # Sparse retrieval: BM25 scores based on keyword overlap.
    sparse_scores = bm25.get_scores(query.lower().split())
    sparse_indices = sorted(range(len(sparse_scores)), key=lambda i: -sparse_scores[i])[:top_n]
    # Dense retrieval: FAISS finds semantically similar docs via embeddings.
    dense_results = vectorstore.similarity_search(query, k=top_n)
    dense_indices = [docs.index(res.page_content) for res in dense_results]
    # Merge indices, preserving order and removing duplicates.
    hybrid_indices = []
    for idx in sparse_indices + dense_indices:
        if idx not in hybrid_indices:
            hybrid_indices.append(idx)
    return [docs[idx] for idx in hybrid_indices]

# Instantiate the custom LLM
llm = CustomAPILLM()

def hybrid_qa(query: str) -> str:
    """
    Answer a user query using hybrid retrieval-augmented generation.
    Retrieves relevant context using both sparse and dense retrieval, then generates an answer.
    Args:
        query: The user question.
    Returns:
        The generated answer string.
    """
    # Retrieve relevant contexts using hybrid retrieval.
    context = "\n".join(hybrid_retrieve(query, top_n=2))
    # Compose the prompt for the LLM.
    prompt = f"Context:\n{context}\n\nQuestion: {query}\nAnswer:"
    # Generate the answer using the custom LLM.
    answer = llm(prompt)
    return answer

# 7. Run a demo
user_query = "How do I reset my device if it won't turn on?"
print("RETRIEVED CONTEXTS:")
for ctx in hybrid_retrieve(user_query):
    print("-", ctx)
print("\nMODEL ANSWER:")
print(hybrid_qa(user_query))

"""
---------------------------
How is this different from classical RAG (dense retrieval)?
---------------------------

Classical RAG (Dense Retrieval):
- Uses only dense retrieval: queries and documents are embedded into vectors, and similarity search (e.g., FAISS) is used to find relevant documents.
- Retrieval is based on semantic similarity, not just keyword overlap.
- May miss relevant documents if the embedding model fails to capture certain keywords or domain-specific terms.

Hybrid RAG (Dense + Sparse):
- Combines dense retrieval (semantic similarity) and sparse retrieval (keyword/BM25).
- BM25 (sparse) excels at exact keyword matches and rare terms, while dense retrieval captures semantic meaning.
- Results from both retrievers are merged (and deduplicated), improving recall and robustness.
- Especially useful when queries are ambiguous, contain rare words, or when the embedding model is imperfect.
- In this code, both retrieval methods are used, and the LLM is provided with a richer, more relevant context.

Summary:
Hybrid RAG leverages the strengths of both retrieval paradigms, often leading to better and more reliable answers than classical (dense-only) RAG.
"""

RETRIEVED CONTEXTS:
- To reset the device, press and hold the power button for 10 seconds.
- If the printer jams, open the rear door and remove the stuck paper.

MODEL ANSWER:


  answer = llm(prompt)


If your device won't turn on, try resetting it by pressing and holding the power button for 10 seconds. If it still doesn't respond, make sure it is properly connected to a power source or charged, then try the reset again.


'\n---------------------------\nHow is this different from classical RAG (dense retrieval)?\n---------------------------\n\nClassical RAG (Dense Retrieval):\n- Uses only dense retrieval: queries and documents are embedded into vectors, and similarity search (e.g., FAISS) is used to find relevant documents.\n- Retrieval is based on semantic similarity, not just keyword overlap.\n- May miss relevant documents if the embedding model fails to capture certain keywords or domain-specific terms.\n\nHybrid RAG (Dense + Sparse):\n- Combines dense retrieval (semantic similarity) and sparse retrieval (keyword/BM25).\n- BM25 (sparse) excels at exact keyword matches and rare terms, while dense retrieval captures semantic meaning.\n- Results from both retrievers are merged (and deduplicated), improving recall and robustness.\n- Especially useful when queries are ambiguous, contain rare words, or when the embedding model is imperfect.\n- In this code, both retrieval methods are used, and the LLM is p

CoT RAG ( Chain of thoughts )

In [3]:
import requests
from typing import List, Any

# --- Custom LLM API for Chain-of-Thought RAG ---

class CustomAPILLM:
    """
    Custom LLM class that uses a remote API to generate answers.
    This replaces local LLMs (like OpenAI) with a call to your own /chat endpoint.
    """

    def __init__(self, api_url: str = "http://localhost:8000/chat", model_version: str = "gpt-4.1-2025-04-14"):
        """
        Initialize the custom LLM API client.
        Args:
            api_url: The URL of your /chat endpoint.
            model_version: The model version to use.
        """
        self.api_url = api_url
        self.model_version = model_version

    def chat(self, messages: List[dict], max_tokens: int = 100, temperature: float = 0.2) -> str:
        """
        Send a chat completion request to the custom API.
        Args:
            messages: List of message dicts (role/content) for the conversation.
            max_tokens: Maximum tokens to generate.
            temperature: Sampling temperature.
        Returns:
            The generated answer as a string.
        """
        # Compose the prompt from the message history
        system_prompt = ""
        user_prompt = ""
        for msg in messages:
            if msg["role"] == "system":
                system_prompt += msg["content"] + "\n"
            elif msg["role"] == "user":
                user_prompt += msg["content"] + "\n"
            elif msg["role"] == "assistant":
                user_prompt += "Assistant: " + msg["content"] + "\n"

        payload = {
            "system_prompt": system_prompt.strip(),
            "user_prompt": user_prompt.strip(),
            "model_version": self.model_version,
            "max_tokens": max_tokens,
            "temperature": temperature,
        }
        response = requests.post(self.api_url, json=payload, timeout=30)
        response.raise_for_status()
        return response.json()["content"]

# --- Dummy Retriever (replace with your own retriever for production) ---

def retrieve_documents(query: str, top_k: int = 3) -> List[str]:
    """
    Dummy retriever for demonstration.
    Replace with your actual retriever (e.g., FAISS, ElasticSearch, etc.).
    Args:
        query: The query string.
        top_k: Number of top documents to retrieve.
    Returns:
        List of relevant document strings.
    """
    knowledge_base = {
        "Paris": "Paris is the capital city of France.",
        "France": "France is a country in Europe.",
        "Eiffel Tower": "The Eiffel Tower is a landmark in Paris.",
        "Europe": "Europe is a continent that includes France.",
    }
    results = []
    for k, v in knowledge_base.items():
        if k.lower() in query.lower():
            results.append(v)
    return results[:top_k]

# --- Chain-of-Thought RAG using Custom API ---

def chain_of_thought_rag(query: str):
    """
    Chain-of-Thought RAG pipeline using a custom LLM API.
    - Performs step-by-step reasoning, retrieving supporting documents at each step.
    - At each step, the LLM generates a thought, and the retriever fetches relevant facts.
    - The process repeats for a fixed number of steps or until no more facts are found.
    - Finally, the LLM is asked to synthesize a final answer based on the reasoning chain and retrieved facts.
    Args:
        query: The user question.
    """
    llm = CustomAPILLM()  # Initialize the custom LLM client

    # Step 1: Initial step-by-step reasoning prompt
    messages = [
        {"role": "system", "content": "You are a helpful assistant who answers questions step by step, using facts you are provided."},
        {"role": "user", "content": f"{query} Let's think step by step."},
    ]

    for step in range(3):
        # Step 2: Model generates the next step/thought
        assistant_message = llm.chat(messages, max_tokens=100, temperature=0.2)
        print(f"Step {step+1} LLM: {assistant_message}")

        # Step 3: Retrieve supporting documents based on the current thought
        retrieved_docs = retrieve_documents(assistant_message)
        if not retrieved_docs:
            break  # No more info found, stop reasoning

        # Step 4: Add retrieved knowledge as "system" messages for the next step
        for doc in retrieved_docs:
            messages.append({"role": "system", "content": f"Relevant information: {doc}"})

        # Step 5: Continue reasoning by appending the assistant's thought
        messages.append({"role": "assistant", "content": assistant_message})

    # Step 6: Ask for the final answer based on the above reasoning and facts
    messages.append({"role": "user", "content": "Based on the above, what is the final answer?"})
    final_answer = llm.chat(messages, max_tokens=100, temperature=0.2)
    print("Final Answer:", final_answer)

# Example usage
if __name__ == "__main__":
    chain_of_thought_rag("Where is the Eiffel Tower located?")

Step 1 LLM: Sure, let's think step by step:

1. The Eiffel Tower is a famous landmark.
2. It is located in Europe.
3. More specifically, it is in France.
4. Within France, it is in the capital city, Paris.
5. In Paris, the Eiffel Tower stands on the Champ de Mars, near the Seine River.

**Final answer:** The Eiffel Tower is located in Paris, France, on the Champ de Mars near the Seine River.
Step 2 LLM: Let's think step by step:

1. The Eiffel Tower is a landmark.
2. This landmark is in Paris.
3. Paris is the capital city of France.
4. France is a country in Europe.

So, the Eiffel Tower is located in Paris, France, which is in Europe.
Step 3 LLM: Let's think step by step:

1. The Eiffel Tower is a landmark in Paris.
2. Paris is the capital city of France.
3. France is a country in Europe.

Therefore, the Eiffel Tower is located in Paris, France, which is in Europe.
Final Answer: The final answer is: The Eiffel Tower is located in Paris, France, which is in Europe.


Minimal Knowledge Graph RAG

In [4]:
import requests

# Example: Toy Knowledge Graph (triples)
KG = [
    ("Eiffel Tower", "located_in", "Paris"),
    ("Paris", "is_capital_of", "France"),
    ("France", "in_continent", "Europe"),
    ("Eiffel Tower", "type", "Landmark"),
    ("Europe", "type", "Continent"),
]

def query_kg(subject=None, predicate=None, obj=None):
    """
    Find all triples in the knowledge graph matching the given pattern.
    Args:
        subject: Subject entity to match (or None for wildcard).
        predicate: Predicate/relation to match (or None for wildcard).
        obj: Object entity to match (or None for wildcard).
    Returns:
        List of matching (subject, predicate, object) triples.
    """
    results = []
    for (s, p, o) in KG:
        if (subject is None or s == subject) and (predicate is None or p == predicate) and (obj is None or o == obj):
            results.append((s, p, o))
    return results

def format_kg_triplets_for_prompt(triplets):
    """
    Format a list of KG triples for inclusion in an LLM prompt.
    Args:
        triplets: List of (subject, predicate, object) tuples.
    Returns:
        String representation for the prompt.
    """
    if not triplets:
        return "No relevant facts found."
    return "\n".join([f"- {s} --[{p}]--> {o}" for s, p, o in triplets])

class CustomAPILLM:
    """
    Custom LLM class that uses a remote API to generate answers.
    This replaces local LLMs (like OpenAI) with a call to your own /chat endpoint.
    """

    def __init__(self, api_url: str = "http://localhost:8000/chat", model_version: str = "gpt-4.1-2025-04-14"):
        """
        Initialize the custom LLM API client.
        Args:
            api_url: The URL of your /chat endpoint.
            model_version: The model version to use.
        """
        self.api_url = api_url
        self.model_version = model_version

    def chat(self, messages, max_tokens=100, temperature=0.2):
        """
        Send a chat completion request to the custom API.
        Args:
            messages: List of message dicts (role/content) for the conversation.
            max_tokens: Maximum tokens to generate.
            temperature: Sampling temperature.
        Returns:
            The generated answer as a string.
        """
        # Compose the prompt from the message history
        system_prompt = ""
        user_prompt = ""
        for msg in messages:
            if msg["role"] == "system":
                system_prompt += msg["content"] + "\n"
            elif msg["role"] == "user":
                user_prompt += msg["content"] + "\n"
            elif msg["role"] == "assistant":
                user_prompt += "Assistant: " + msg["content"] + "\n"

        payload = {
            "system_prompt": system_prompt.strip(),
            "user_prompt": user_prompt.strip(),
            "model_version": self.model_version,
            "max_tokens": max_tokens,
            "temperature": temperature,
        }
        response = requests.post(self.api_url, json=payload, timeout=30)
        response.raise_for_status()
        return response.json()["content"]

def kg_rag_chain_of_thought(question):
    """
    Minimal Knowledge Graph RAG pipeline using a custom LLM API.
    - Performs step-by-step reasoning, retrieving supporting facts from a knowledge graph at each step.
    - At each step, the LLM generates a thought, and the retriever fetches relevant KG triples.
    - The process repeats for a fixed number of steps or until no more facts are found.
    - Finally, the LLM is asked to synthesize a final answer based on the reasoning chain and retrieved facts.
    Args:
        question: The user question.
    """
    llm = CustomAPILLM()  # Initialize the custom LLM client

    # Step 1: Initial user question with structured reasoning prompt
    messages = [
        {"role": "system", "content": "You are a helpful assistant who answers questions step by step using facts from a knowledge graph. Each fact is presented as Subject --[Predicate]--> Object."},
        {"role": "user", "content": f"{question}\nLet's reason with the knowledge graph step by step."},
    ]

    # Step 2: Ask LLM what to look for first (could also be automated)
    assistant_message = llm.chat(messages, max_tokens=100, temperature=0.2)
    print("Step 1 LLM:", assistant_message)

    # Step 3: Retrieve relevant KG facts based on LLM's thought (for demo, hardcoded for Eiffel Tower)
    # In production, parse assistant_message to extract entity/relation of interest
    retrieved_facts = query_kg(subject="Eiffel Tower", predicate="located_in")
    kg_facts = format_kg_triplets_for_prompt(retrieved_facts)
    print("Retrieved from KG:\n", kg_facts)

    # Step 4: Add KG facts and continue reasoning
    messages.extend([
        {"role": "assistant", "content": assistant_message},
        {"role": "system", "content": f"Facts from the KG:\n{kg_facts}"},
        {"role": "user", "content": "What can we infer next?"},
    ])
    second_message = llm.chat(messages, max_tokens=100, temperature=0.2)
    print("Step 2 LLM:", second_message)

    # Step 5: Retrieve more KG facts (e.g., about Paris)
    retrieved_facts2 = query_kg(subject="Paris")
    kg_facts2 = format_kg_triplets_for_prompt(retrieved_facts2)
    print("Retrieved from KG:\n", kg_facts2)

    # Step 6: Continue chain and ask for final answer
    messages.extend([
        {"role": "assistant", "content": second_message},
        {"role": "system", "content": f"More facts from the KG:\n{kg_facts2}"},
        {"role": "user", "content": "Given all these facts, what is the final answer?"},
    ])
    final_response = llm.chat(messages, max_tokens=100, temperature=0.2)
    print("Final Answer:", final_response)

# Example usage:
if __name__ == "__main__":
    kg_rag_chain_of_thought("Which country is the Eiffel Tower in?")

Step 1 LLM: Sure! Let's reason step by step using the knowledge graph:

1. Eiffel Tower --[located in]--> Paris  
2. Paris --[located in]--> France

Therefore, the Eiffel Tower is in France.
Retrieved from KG:
 - Eiffel Tower --[located_in]--> Paris
Step 2 LLM: Since we know the Eiffel Tower is in France, we can infer more about its geographical or cultural context. For example:

- France --[located in]--> Europe

So, the Eiffel Tower is also located in Europe.

If you want to know more, you could ask about its history, significance, or other landmarks in Paris or France!
Retrieved from KG:
 - Paris --[is_capital_of]--> France
Final Answer: Given all these facts, the final answer is:

The Eiffel Tower is in France.
