In [37]:
# @title Default title text
!pip install langgraph langsmith langchain langchain_community
!pip install faiss-cpu
!pip install sentence-transformers
!pip install -U langchain-huggingface
!pip install -U langchain-google-genai
!pip install -U langchain_openai
!pip install -U openai
!pip install -U pinecone
!pip install flashrank

Collecting pinecone
  Downloading pinecone-7.2.0-py3-none-any.whl.metadata (9.5 kB)
Downloading pinecone-7.2.0-py3-none-any.whl (524 kB)
   ---------------------------------------- 0.0/524.3 kB ? eta -:--:--
   --------------------------------------- 524.3/524.3 kB 16.4 MB/s eta 0:00:00
Installing collected packages: pinecone
  Attempting uninstall: pinecone
    Found existing installation: pinecone 7.1.0
    Uninstalling pinecone-7.1.0:
      Successfully uninstalled pinecone-7.1.0
Successfully installed pinecone-7.2.0


## PINECONE DB RETRIEVER

In [38]:
import os
from dotenv import load_dotenv
from pinecone import Pinecone
from typing import List, Any
from pydantic import BaseModel, Field

from langchain.schema import BaseRetriever, Document
from langchain.load import dumps, loads
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate


class PineconeDBRetriever(BaseRetriever, BaseModel):
    """
    A custom LangChain retriever for Pinecone.
    """
    index_name: str
    pinecone_api_key: str
    namespace: str
    top_k: int = 5
    index: Any = Field(None, exclude=True)

    def __init__(self, **data):
        """
        Initializes the Pinecone client and index.
        """
        super().__init__(**data)
        pc = Pinecone(api_key=self.pinecone_api_key)
        self.index = pc.Index(self.index_name)

    def _get_relevant_documents(self, query: str) -> List[Document]:
        """
        The core method to retrieve documents. LangChain's retriever system
        calls this method.

        Args:
            query (str): The user's question.

        Returns:
            List[Document]: A list of relevant documents from Pinecone.
        """
        # Pinecone's hosted embedding model will automatically embed the query text.
        results = self.index.search(
            namespace=self.namespace,
            query={
                "inputs": {"text": query},
                "top_k": self.top_k
            }
        )

        # Convert Pinecone's search results into LangChain Document objects.
        # TODO: Add additional fields as necessary.
        documents = []
        if results and 'result' in results and 'hits' in results['result']:
            for match in results['result']['hits']:
                # The actual text content is in the 'fields' dictionary
                page_content = match.get('fields', {}).get('text', '')
                # metadata = {"id": match.get("_id"), "score": match.get("_score")}
                metadata = {"id": match.get("_id")} # Removing score to allow easy serialization and help de-duplication

                doc = Document(
                    page_content=page_content,
                    metadata=metadata
                )
                documents.append(doc)

        return documents

    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        """
        Asynchronous version of the document retrieval method.
        """
        # For simplicity, we'll just call the synchronous version.
        # For a production environment, you might want to use an async Pinecone client.
        return self._get_relevant_documents(query)

## RAG ORCHESTRATOR

In [50]:
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.runnables import RunnableConfig, RunnableParallel
from langchain_openai import ChatOpenAI
from pprint import pprint
from typing import Any, List
from langchain.load import dumps, loads
from sentence_transformers.cross_encoder import CrossEncoder
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

class RAGOrchestrator:
    """
    Orchestrates the RAG pipeline based on a given configuration.
    """
    def __init__(self, config: dict):
        """
        Initializes the orchestrator with a configuration dictionary.

        Args:
            config (dict): A dictionary containing settings for the RAG pipeline,
                           such as model name, index name, and retrieval strategy.
        """
        self.config = config
        self.debug = config.get("debug", False)
        self.llm = ChatOpenAI(
            model=config.get("llm_model", "gpt-4o-mini"),
            api_key=OPENAI_API_KEY
        )
        if config.get("retrieval_strategy") != "llm_only":
            self.retriever = PineconeDBRetriever(
                index_name=config.get("index_name"),
                pinecone_api_key=PINECONE_API_KEY,
                namespace=config.get("namespace"),
                top_k=config.get("top_k", 5)
            )

    # --- Debugging Helper ---
    def _print_debug(self, header: str, data: Any):
        if self.debug:
            print("\n" + "="*20)
            print(f"DEBUG: {header}")
            print("="*20)
            pprint(data)
        return data # Pass data through unchanged

    def _tap_and_log(self, x: dict) -> dict:
        """
        A helper method to print debug info and pass the input dictionary through unchanged.
        """
        self._print_debug("Final Context for LLM", x.get("context_str", "Context not available"))
        return x

    def _get_unique_union(self, documents: list[list]) -> List[Document]:
        """
        Takes a list of document lists, merges them, and removes duplicates.
        """
        # Serialize each document to a string to make them hashable
        flattened_docs = [dumps(doc) for sublist in documents for doc in sublist]
        # Use a set to get unique serialized documents
        unique_docs = list(set(flattened_docs))
        # Deserialize unique documents back into Document objects
        return [loads(doc) for doc in unique_docs]

    # --- Retrieval Strategy Helpers ---

    def _get_multi_query_chain(self):
        # Builds a chain that generates multiple queries and retrieves documents for each.
        # 1. Prompt for generating multiple queries
        template = """You are an AI language model assistant. Your task is to generate five
        different versions of the given user question to retrieve relevant documents from a vector
        database. By generating multiple perspectives on the user question, your goal is to help
        the user overcome some of the limitations of the cosine-based similarity search.
        Provide these alternative questions separated by newlines. Original question: {question}"""
        prompt_perspectives = ChatPromptTemplate.from_template(template)

        # 3. The chain for generating and retrieving
        generate_queries = (
            prompt_perspectives
            | self.llm
            | StrOutputParser()
            | (lambda x: x.split("\n"))
            | RunnableLambda(lambda x: [q for q in x if q.strip()])
            | RunnableLambda(lambda x: self._print_debug("Generated Queries", x))
        )

        retrieval_chain = generate_queries | self.retriever.map() | self._get_unique_union | RunnableLambda(lambda docs: self._print_debug("Retrieved Documents", docs))
        return retrieval_chain

    def _get_rag_fusion_chain(self):
        """Builds a chain for RAG Fusion with reciprocal rank fusion."""
        # 1. The multi-query generation is the same as above
        template = """You are an AI language model assistant. Your task is to generate five
        different versions of the given user question to retrieve relevant documents from a vector
        database. By generating multiple perspectives on the user question, your goal is to help
        the user overcome some of the limitations of the cosine-based similarity search.
        Provide these alternative questions separated by newlines. Original question: {question}"""
        prompt_perspectives = ChatPromptTemplate.from_template(template)

        generate_queries = (
            prompt_perspectives
            | self.llm
            | StrOutputParser()
            | (lambda x: x.split("\n"))
            | RunnableLambda(lambda x: [q for q in x if q.strip()])
            | RunnableLambda(lambda x: self._print_debug("Generated Queries", x))
        )

        # 2. Reranking with Reciprocal Rank Fusion
        def reciprocal_rank_fusion(results: list[list], k=60):
            fused_scores = {}
            for docs in results:
                for rank, doc in enumerate(docs):
                    doc_str = dumps(doc)
                    if doc_str not in fused_scores:
                        fused_scores[doc_str] = 0
                    fused_scores[doc_str] += 1 / (rank + k)

            # .item() converts [doc_str: score] pairs to a list of tuples [doc_str, score]
            # Sort by score in descending order (reverse=True)
            reranked_results = [
                (loads(doc), score)
                for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
            ]
            self._print_debug("Reranked Documents (RAG Fusion)", reranked_results)
            # Return only the documents, not the scores
            return [doc for doc, score in reranked_results]

        # 3. The RAG Fusion chain
        retrieval_chain = generate_queries | self.retriever.map() | reciprocal_rank_fusion
        return retrieval_chain

    def _get_decomposition_chain(self):
        # Builds a chain that decomposes a question into sub-questions.
        # 1. Prompt for generating sub-questions
        decomposition_template = """You are a helpful assistant that generates multiple sub-questions
        related to an input question. The goal is to break down the input into a set of sub-problems
        that can be answered in isolation. Generate multiple search queries related to: {question}
        Output (separated by newlines):"""
        prompt_decomposition = ChatPromptTemplate.from_template(decomposition_template)

        # 2. Chain to generate and clean up sub-questions
        generate_queries_decomposition = (
            prompt_decomposition
            | self.llm
            | StrOutputParser()
            | (lambda x: x.split("\n"))
            | RunnableLambda(lambda x: [q for q in x if q.strip()])
            | RunnableLambda(lambda x: self._print_debug("Decomposed Sub-questions", x))
        )

        # 3. The full retrieval chain using the decomposed questions
        retrieval_chain = (
            generate_queries_decomposition
            | self.retriever.map()
            | self._get_unique_union
            | RunnableLambda(lambda docs: self._print_debug("Retrieved Documents (Decomposition)", docs)
            ))
        return retrieval_chain

    def _get_step_back_chain(self):
        # Builds a chain that generates a general, "stepped-back" question and retrieves documents for it.
        # 1. Prompt to generate a more general, "stepped-back" question
        step_back_template = """You are an expert at world knowledge. Your task is to step back and
        paraphrase a question to a more generic step-back question, which is easier to answer.

        Here are a few examples:
        Original Question: What is the C29x CPU architecture in the F29H85x microcontroller?
        Step-Back Question: What are the technical specifications of the C29x CPU architecture?

        Original Question: Which TI device was recommended for automotive radar in the 2023 safety seminar?
        Step-Back Question: What are some common TI devices used for automotive radar applications?

        Original Question: {question}
        Step-Back Question:"""
        prompt_step_back = ChatPromptTemplate.from_template(step_back_template)

        # 2. Chain to generate the new question
        generate_step_back_query = (
            prompt_step_back
            | self.llm
            | StrOutputParser()
            #| (lambda x: x.split("\n"))
            #| RunnableLambda(lambda x: [q for q in x if q.strip()])
            | RunnableLambda(lambda x: self._print_debug("Generated Step-Back Question", x))
        )

        # 3. The full retrieval chain using the new question
        # This takes the original question, generates a new one, and retrieves docs with it
        retrieval_chain = generate_step_back_query | self.retriever | RunnableLambda(lambda docs: self._print_debug("Retrieved Documents (Step back)", docs))
        return retrieval_chain

    def _get_hyde_chain(self):
        # Builds a chain that generates a hypothetical document and uses it for retrieval.

        # 1. Prompt to generate a hypothetical document (a plausible answer)
        hyde_template = """Please write a passage to answer the user's question.
        This passage should be detailed and informative, as if it came from a technical document.
        The purpose is to create a rich text for a vector search.

        Question: {question}
        Passage:"""
        prompt_hyde = ChatPromptTemplate.from_template(hyde_template)

        # 2. Chain to generate the hypothetical document
        generate_hyde_document = (
            prompt_hyde
            | self.llm
            | StrOutputParser()
            | RunnableLambda(lambda x: self._print_debug("Generated Hypothetical Document", x))
        )

        # 3. The full retrieval chain: generate a hypothetical doc, then retrieve with it
        retrieval_chain = generate_hyde_document | self.retriever | RunnableLambda(lambda docs: self._print_debug("Retrieved Documents (HyDE)", docs))
        return retrieval_chain

    # --- Post Retrieval Processing Helpers ---
    
    def _get_st_reranking_chain(self):
        """
        Creates a Runnable that performs semantic re-ranking using a
        Cross-Encoder model from the sentence-transformers library.
        """
        # Initialize a cross-encoder model. This model is lightweight and effective.
        # It will be downloaded from the Hugging Face Hub on first use.
        model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

        def rerank_docs(inputs: dict):
            documents = inputs.get("documents", [])
            query = inputs.get("query", "")
            if not documents or not query:
                return []

            self._print_debug(f"Documents going INTO Re-ranker ({len(documents)} docs)", documents)

            # 1. Create pairs of [query, passage] for the cross-encoder
            sentence_pairs = [(query, doc.page_content) for doc in documents]

            # 2. Predict the relevance scores. The output is a list of scores.
            scores = model.predict(sentence_pairs)

            # 3. Combine the original documents with their new scores
            scored_docs = list(zip(scores, documents))

            # 4. Sort the documents by score in descending order
            scored_docs.sort(key=lambda x: x[0], reverse=True)

            # 5. Extract the documents and limit by top_n
            reranked_docs = [doc for score, doc in scored_docs]
            configured_top_n = self.config.get("reranker_top_n", 5)
            effective_top_n = min(configured_top_n, len(reranked_docs))
            final_docs_to_return = reranked_docs[:effective_top_n]
            
            self._print_debug(f"Documents COMING OUT of Re-ranker ({len(final_docs_to_return)} docs)", final_docs_to_return)
            return final_docs_to_return

        return RunnableLambda(rerank_docs)
    
    def _get_contextual_compression_retriever(self, base_retriever):
        """
        Takes a base retriever and wraps it with a compressor.
        """
        # 1. Initialize the compressor. This component uses an LLM to read each retrieved document and extract only the sentences relevant to the query.
        compressor = LLMChainExtractor.from_llm(self.llm)

        # 2. Create the compression retriever. This is a wrapper that first runs the base_retriever, then passes the results to the compressor.
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=base_retriever
        )
        
        self._print_debug("Contextual Compression Retriever", "Initialized and ready.")
        return compression_retriever    

    # --- Propmt Stregy Helpers ---
    def _get_final_prompt(self):
        """
        Selects and returns the final prompt template based on the config.
        """
        prompt_strategy = self.config.get("prompt_strategy", "strict_context")
        
        if prompt_strategy == "permissive_context":
            # This prompt allows the LLM to use its own knowledge
            template = """You are a helpful expert assistant for Texas Instruments products.
            Answer the user's question based on the context provided.
            If the context is not sufficient to answer the question, use your own knowledge to provide a comprehensive answer,
            but you must state that the information comes from your general knowledge.
            
            Context: {context}
            Question: {question}
            """
        else: # Default to "strict_context"
            # This prompt forces the LLM to only use the provided documents
            template = """Answer the following question based ONLY on the provided context.
            If the answer is not available in the context, you must say "Based on the provided context, I cannot answer this question."
            
            Context: {context}
            Question: {question}
            """
            
        self._print_debug(f"Using Prompt Strategy: {prompt_strategy}", template)
        return ChatPromptTemplate.from_template(template)
    
    def invoke(self, question: str) -> dict:
        """
        Builds and invokes the RAG chain based on the configuration.

        Args:
            question (str): The user's question.

        Returns:
            dict: A dictionary containing the question, retrieved context, and the final answer.
        """
        strategy = self.config.get("retrieval_strategy", "simple")
        post_processing_strategy = self.config.get("post_retrieval_processing", "none")
        prompt_strategy = self.config.get("prompt_strategy", "strict_context")

        # LLM_only strategy does not use retrieval
        if strategy == "llm_only":
            self._print_debug("Strategy", "LLM Only (No RAG)")
            answer = self.llm.invoke(question).content
            return {
                "question": question,
                "answer": answer,
                "strategy": strategy,
                "context": "N/A" # No context was used
            }

        # --- For RAG-based strategies ---

        # Select the base retrieval chain (gets the initial list of documents)
        if strategy == "multi_query":
            base_retrieval_chain = self._get_multi_query_chain()
        elif strategy == "rag_fusion":
            base_retrieval_chain = self._get_rag_fusion_chain()
        elif strategy == "decomposition":
            base_retrieval_chain = self._get_decomposition_chain()
        elif strategy == "step_back":
            base_retrieval_chain = self._get_step_back_chain()
        elif strategy == "hyde":
            base_retrieval_chain = self._get_hyde_chain()
        # -----------------------------
        else: # Default to simple retrieval
            base_retrieval_chain = self.retriever | RunnableLambda(
                lambda docs: self._print_debug("Retrieved Documents (Simple)", docs)
            )

        # Conditionally apply post-processing
        # If no post-processing is specified, use the base retrieval chain as is
        final_retrieval_chain = base_retrieval_chain
        
        if "semantic_re_ranking" in post_processing_strategy:
            # The re-ranker needs both docs and query
            reranker_chain = {"documents": final_retrieval_chain, "query": RunnablePassthrough()} | self._get_st_reranking_chain()
            final_retrieval_chain = reranker_chain
        
        # Conditionally apply the compression layer
        if "contextual_compression" in post_processing_strategy:
            # The compression retriever wraps the base retriever
            final_retrieval_chain = self._get_contextual_compression_retriever(final_retrieval_chain)


        # Final chains for invoking the LLM
        final_prompt = self._get_final_prompt()

        def format_docs(docs: List[Document]) -> str:
            return "\n\n".join(doc.page_content for doc in docs)

        context_formatter = (
            RunnablePassthrough.assign(
                context_str=itemgetter("context") | RunnableLambda(format_docs)
            )
            #| RunnableLambda(self._tap_and_log)
        )        
        
        rag_chain = (
            {"context": final_retrieval_chain, "question": RunnablePassthrough()}
            | context_formatter
            | {
                  "answer": (
                      lambda x: {"context": x["context_str"], "question": x["question"]}
                  ) | final_prompt | self.llm | StrOutputParser(),
                  "context": itemgetter("context"),
              }
        )

        result = rag_chain.invoke(question)

        return {
            "question": question,
            "answer": result['answer'],
            "strategy": f"{strategy} RAG + {post_processing_strategy} Post-Retrieval Processing + {prompt_strategy} Prompt Strategy",
            "context": result['context']
        }

In [None]:
# --- Example Usage ---
# main.py
from pprint import pprint


# Get API keys using using dotenv
load_dotenv()
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY_SHIVAM")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# Get API keys using google.colab
# from google.colab import userdata
# OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
# PINECONE_API_KEY = userdata.get('PINECONE_API_KEY_SHIVAM')


# --- Example 1: RAG Fusion with Debugging Enabled ---
print("\n\n" + "--- Running with RAG strategy ---")
config = {
    "llm_model": "gpt-4o-mini",
    "retrieval_strategy": "decomposition",
    "post_retrieval_processing": "semantic_re_ranking+contextual_compression",
    "prompt_strategy": "permissive_context",
    "index_name": "swru526-pine",
    "namespace": "example-namespace",
    "top_k": 10,
    "reranker_top_n": 5,  # This is for the re-ranker, if used
    "debug": True  # <-- Enable debugging
}

orchestrator_RAG = RAGOrchestrator(config)
question = "What are the key features of the 14xx mmWave devices?"
result_RAG = orchestrator_RAG.invoke(question)
print("\n--- FINAL OUTPUT ---")
pprint(result_RAG)


# --- Example 2: LLM Only (No RAG) via Config ---
print("\n\n" + "--- Running with LLM Only Strategy ---")
llm_only_config = {
    "llm_model": "gpt-4o-mini",
    "retrieval_strategy": "llm_only",
    "debug": False # Debug flag works here too
}

orchestrator_llm_only = RAGOrchestrator(llm_only_config)
result_llm_only = orchestrator_llm_only.invoke(question)
print("\n--- FINAL OUTPUT ---")
pprint(result_llm_only)



--- Running with RAG strategy ---

DEBUG: Contextual Compression Retriever
'Initialized and ready.'

DEBUG: Using Prompt Strategy: permissive_context
('You are a helpful expert assistant for Texas Instruments products.\n'
 "            Answer the user's question based on the context provided.\n"
 '            If the context is not sufficient to answer the question, use '
 'your own knowledge to provide a comprehensive answer,\n'
 '            but you must state that the information comes from your general '
 'knowledge.\n'
 '            \n'
 '            Context: {context}\n'
 '            Question: {question}\n'
 '            ')

DEBUG: Retrieved Documents (Simple)
[Document(metadata={'id': 'pdf_doc_1_chunk_13_da6ca75b041245d18674dd726a03d374'}, page_content='the mmWave 14xx device). The accelerator is connected to a 128-bit bus that is present in the main\nprocessor system, as shown in Figure 1.\nThe Radar Hardware Accelerator module comprises an accelerator engine and four memorie

In [None]:
# RAG Orchestrator Configuration Guide

# 1. Core Retrieval Strategy
# This is the main method used to find and fetch the initial set of documents.
# "retrieval_strategy":
#     "simple": A single vector search against the user's query.
#     "multi_query": Generates multiple variations of the query and combines the results.
#     "rag_fusion": Generates multiple variations and combines results using Reciprocal Rank Fusion.
#     "decomposition": Breaks a complex query into sub-questions and retrieves for each.
#     "step_back": Asks a more general question to get broader context.
#     "hyde": Generates a hypothetical document to guide the search.
#     "llm_only": No retrieval at all; asks the LLM directly.

# 2. Post-Retrieval Processing
# This defines what happens to the documents after they are retrieved but before they are sent to the LLM.
# "post_retrieval_processing":
#     "none": No processing; use the documents as-is.
#     "semantic_re_ranking": Uses a Cross-Encoder to re-rank documents for higher relevance.
#     "contextual_compression": Uses an LLM to extract only the most relevant sentences from documents.
#     "semantic_re_ranking+contextual_compression": Applies re-ranking first, then compression for the highest quality context.

# 3. Final Prompting Strategy
# This determines how the LLM is instructed to use the context to formulate the final answer.
# "prompt_strategy":
#     "strict_context": Forbids the LLM from using any knowledge outside of the provided documents.
#     "permissive_context": Allows the LLM to use its own knowledge to supplement the context.

# 4. General Parameters
# These are the basic "knobs" for any given run.
#     "llm_model": e.g., "gpt-4o-mini", "gpt-4o"
#     "index_name": The specific Pinecone index to target.
#     "namespace": The namespace within that index.
#     "top_k": The number of documents to retrieve initially.
#     "reranker_top_n": The number of documents to return after the semantic re-ranking step.
#     "debug": True or False.