# RAG System with Gemini and FAISS
# ================================

In [2]:
import os
from pathlib import Path
from typing import List, Union

import requests
import faiss
from dotenv import load_dotenv
from sentence_transformers import CrossEncoder

from langchain.chains import LLMChain, ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
from langchain.schema import Document, BaseRetriever
from langchain.text_splitter import RecursiveCharacterTextSplitter

from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyPDFLoader

from langchain_google_genai import GoogleGenerativeAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.docstore.in_memory import InMemoryDocstore

from langchain_openai import ChatOpenAI

## 1. Configuration
First, let's set up the configuration for our RAG system.

In [13]:
load_dotenv()

# Configuration settings
EMBEDDING = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_MODEL = HuggingFaceEmbeddings(model_name=EMBEDDING)
CHUNK_SIZE = 800
CHUNK_OVERLAP = 80
RETRIEVER_K = 4
# GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

# if not GEMINI_API_KEY:
#     raise ValueError("GEMINI_API_KEY not found in environment variables")

# llm = GoogleGenerativeAI(
#     api_key=GEMINI_API_KEY,
#     model="gemini-2.0-flash",
#     verbose=False
# )

In [12]:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY not found in environment variables")

llm = ChatOpenAI(
    api_key=OPENAI_API_KEY,
    model="gpt-3.5-turbo",
    temperature=0.0,
    verbose=False
)
try:
    print("llm.model_name:", llm.model_name)
except AttributeError:
    print("llm.model:", llm.model)

llm.model_name: gpt-3.5-turbo


## 2. Document Processing Functions
Let's define functions to download and process PDF documents.

In [3]:
def download_pdf(url: str, folder: str = 'documents') -> str:
    """
    Downloads PDF from given URL
    """
    os.makedirs(folder, exist_ok=True)
    filename = os.path.basename(url.split('?')[0])
    filepath = os.path.join(folder, filename)

    response = requests.get(url, timeout=10)
    response.raise_for_status()

    with open(filepath, 'wb') as f:
        f.write(response.content)
    return filepath

def process_document(
    documents: Union[List[str], List[Document]], chunk_size: int, chunk_overlap: int) -> List[Document]:
    """
    Process a list of Document objects or URLs into chunks while preserving parent document relationships.
    """
    if isinstance(chunk_size, str):
        chunk_size = int(chunk_size)
    if isinstance(chunk_overlap, str):
        chunk_overlap = int(chunk_overlap)

    if documents and isinstance(documents[0], str):
        loaded_docs = []
        for url in documents:
            pdf_path = download_pdf(url)
            pdf_docs = PyPDFLoader(pdf_path).load()
            loaded_docs.extend(pdf_docs)
        documents = loaded_docs

    def get_filename(path):
        if not path or path == "unknown":
            return "unknown_document"
        return os.path.basename(path).split('.')[0]

    source_groups = {}
    for doc in documents:
        source = doc.metadata.get("source", "unknown")
        source_groups.setdefault(source, []).append(doc)

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )

    all_chunks = []
    for source, docs in source_groups.items():
        docs.sort(key=lambda x: x.metadata.get("page", 0))
        parent_id = get_filename(source)
        chunks = splitter.split_documents(docs)

        for i, chunk in enumerate(chunks):
            page_num = chunk.metadata.get("page", 0)
            chunk_id = f"{parent_id}_p{page_num}_c{i}"
            chunk.metadata.update({
                "parent_id": parent_id,
                "parent_source": source,
                "chunk_id": chunk_id,
                "chunk_index": i,
                "total_chunks": len(chunks)
            })
            all_chunks.append(chunk)

    return all_chunks

## 3. Vector Store Creation
Now let's create a function to build our vector store from documents.

In [None]:
def process_and_store_documents(documents, chunk_size, chunk_overlap,
                                embedding_model=EMBEDDING_MODEL,
                                persist_directory="faiss_index") -> FAISS:
    """
    Process documents into chunks and add to FAISS vector DB,
    skipping documents with existing parent_id.
    """
    from langchain.vectorstores.faiss import FAISS
    import os

    chunks = process_document(documents, chunk_size, chunk_overlap)

    if os.path.exists(persist_directory):
        vector_store = FAISS.load_local(persist_directory, embedding_model, allow_dangerous_deserialization=True)
        existing_ids = {
            doc.metadata["parent_id"]
            for doc in vector_store.docstore._dict.values()
            if "parent_id" in doc.metadata
        }
    else:
        vector_store = None
        existing_ids = set()

    new_chunks = [c for c in chunks if c.metadata["parent_id"] not in existing_ids]

    if not new_chunks:
        print("No new documents to add.")
        return vector_store

    if vector_store:
        vector_store.add_documents(new_chunks)
    else:
        vector_store = FAISS.from_documents(new_chunks, embedding_model)

    vector_store.save_local(persist_directory)
    return vector_store


## 4. Long-Term memory storage in Vectorstore
This part is created to store conversation with LLM as additional context to be provided during retriving data.

In [5]:
def create_memory_vectorstore(embedding_model, path="memory_vectorstore"):
    if Path(path).exists():
        return FAISS.load_local(path, embedding_model, allow_dangerous_deserialization=True)
    
    dim = len(embedding_model.embed_query("test"))
    index = faiss.IndexFlatL2(dim)

    docstore = InMemoryDocstore()
    index_to_docstore_id = {}
    
    return FAISS(
        embedding_function=embedding_model,
        index=index,
        docstore=docstore,
        index_to_docstore_id=index_to_docstore_id
    )

def store_to_memory_vectorstore(question, answer, vectorstore):
    content = f"Q: {question}\nA: {answer}"
    doc = Document(page_content=content, metadata={"type": "chat_memory"})
    vectorstore.add_documents([doc])
    vectorstore.save_local("memory_vectorstore")
    
def get_relevant_memory(query, vectorstore, k=3):
    try:
        memory_docs = vectorstore.similarity_search(query, k=k)
        return "\n---\n".join([doc.page_content for doc in memory_docs])
    except (IndexError, ValueError) as e:
        print(f"Skipping memory retrieval (reason: {str(e)})")
        return ""



## 5. Reranker


In [6]:
def create_parent_document_llm_reranker(vectorstore, top_k_chunks=20, top_k_parents=4):
    class LLMRerankerRetriever(BaseRetriever):
        def _get_relevant_documents(self, query: str) -> List[Document]:
            cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

            relevant_chunks_with_scores = vectorstore.similarity_search_with_score(query, k=top_k_chunks)
            chunks = [doc for doc, _ in relevant_chunks_with_scores]
            scores = [score for _, score in relevant_chunks_with_scores]

            parent_docs = {}
            for chunk, score in zip(chunks, scores):
                parent_id = chunk.metadata.get("parent_id") or chunk.metadata.get("doc_id", f"doc_{len(parent_docs)}")
                if parent_id not in parent_docs:
                    parent_docs[parent_id] = {
                        "chunks": [],
                        "scores": [],
                        "source": chunk.metadata.get("parent_source", "unknown")
                    }
                parent_docs[parent_id]["chunks"].append(chunk)
                parent_docs[parent_id]["scores"].append(score)

            parent_list = []
            for parent_id, parent in parent_docs.items():
                parent["chunks"].sort(key=lambda x: (x.metadata.get("page", 0), x.metadata.get("chunk_index", 0)))
                full_text = "\n".join([chunk.page_content for chunk in parent["chunks"]])

                try:
                    rerank_score = float(cross_encoder.predict([(query, full_text)])[0])
                except Exception:
                    rerank_score = 0.0

                for c in parent["chunks"]:
                    c.metadata["rerank_score"] = rerank_score

                parent_list.append({
                    "id": parent_id,
                    "chunks": parent["chunks"],
                    "rerank_score": rerank_score,
                    "source": parent["source"]
                })

            parent_list.sort(key=lambda x: x["rerank_score"], reverse=True)

            top_docs = []
            for parent in parent_list[:top_k_parents]:
                top_docs.extend(parent["chunks"])

            return top_docs

        async def _aget_relevant_documents(self, query: str):
            raise NotImplementedError("Async version not implemented.")

    return LLMRerankerRetriever()

## 6. Different types of prompts


In [7]:
from langchain.prompts import PromptTemplate

def get_prompt(prompt_type: str, role: str = "Ai Assistant") -> PromptTemplate:
    if prompt_type == "zero_shot":
        template = """Use the following context to answer the question. Be clear and concise.

        Context: {context}

        Question: {question}

        Answer:"""
    elif prompt_type == "explain_like_5":
        template = """Use the following pieces of context to answer the question. Explain like you are talking to a 5-year-old. 
        If the question is not related to the context, say "I don't know". If you don't know the answer, just say that you don't know.

        Context: {context}

        Question: {question}

        Provide a clear and concise answer.

        Answer:"""
    elif prompt_type == "cot":
        template = """Use the following context to answer the question. Think step-by-step and explain your reasoning.

        Context: {context}

        Question: {question}

        Let's think step by step:

        Answer:"""
    elif prompt_type == "elaborate":
        template = """Use the following context to answer the question in a detailed, formal tone. If you can't answer, say "I don't know".

        Context: {context}

        Question: {question}

        Detailed answer:"""
    elif prompt_type == "meta":
        template = """You are an AI assistant tasked with answering the question using the provided context. 
        First, generate an optimal prompt that would help an LLM perform this task effectively.
        Then, respond to that prompt yourself to complete the task.
        Reflect on your reasoning process as you answer. Clearly state what you know, what you are assuming, and how confident you are.

        Context:
        {context}

        Question:
        {question}

        Answer (include reasoning, assumptions, and confidence level):"""
    elif prompt_type == "role":
        template = """You are acting as {role}. Use the following context to answer the question appropriately for your role.
        If the question is not related to the context, say "I don't know". If you're unsure of the answer, acknowledge that.

        Context:
        {context}

        Question:
        {question}

        As a {role}, your answer:"""
    elif prompt_type == "react":
        template = """You are an intelligent assistant that reasons step-by-step and can use external context to answer questions.

        Use the following format:

        Question: {question}
        Thought: Think about what you need to find.
        Action: Look up relevant information in the context.
        Observation: Summarize what the context says.
        Thought: Reflect on how the observation answers the question.
        Final Answer: Give a complete and clear answer.

        Context:
        {context}

        Now follow the steps to answer:
        """
    elif prompt_type == "verify":
        template = """
        Given the context and the answer, verify if the answer is fully supported. 
        Respond with YES or NO, then explain briefly.
        
        Context:
        {context}
        
        Answer:
        {answer}
        
        Is this answer verifiable?
        """
    else:
        raise ValueError(f"Unknown prompt type: {prompt_type}")

    return PromptTemplate.from_template(template)


## 7. RAG Chain Creation
Let's create our RAG chain with the Gemini model.

In [8]:
def create_rag_chain(
    vectorstore,
    llm,
    prompt_type: str = "zero_shot",
    use_memory: bool = False,
    use_reranking: bool = False,
    retriever_k: int = 4,
    top_k_chunks: int = 20
):
    """
    Create a RAG chain with optional memory, reranking, and query rewriting.
    """
    prompt = get_prompt(prompt_type)

    memory = (
        ConversationBufferMemory(
            memory_key="chat_history",
            input_key="question",
            output_key="answer",
            return_messages=True
        )
        if use_memory else None
    )

    if use_reranking:
        retriever = create_parent_document_llm_reranker(
            vectorstore=vectorstore,
            top_k_chunks=top_k_chunks,
            top_k_parents=retriever_k
        )
    else:
        retriever = vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": retriever_k}
        )

    base_chain = ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=retriever,
        combine_docs_chain_kwargs={"prompt": prompt},
        return_source_documents=True,
        memory=memory,
        verbose=False
    )

    return base_chain, memory

In [9]:
def print_result_summary(result):
    print(f"\n🧠 Answer:\n{result['answer']}\n")

    print("🔎 Retrieved Documents:")
    for i, doc in enumerate(result["source_documents"], 1):
        print(f"\n📄 Chunk #{i}")
        print(f"📚 Source: {doc.metadata.get('parent_source', 'unknown')}")
        print(f"📄 Page: {doc.metadata.get('page', 'unknown')}")
        print(f"🏷️ Rerank Score: {doc.metadata.get('rerank_score', 'N/A')}")
        #print("📝 Excerpt:")
        #print(doc.page_content.strip()[:500] + ("..." if len(doc.page_content) > 500 else ""))

In [10]:
def initialize_rag():
    docs = ['https://assets.pokemon.com/assets/cms2/pdf/trading-card-game/rulebook/sm7_rulebook_en.pdf','https://media.wizards.com/images/magic/tcg/resources/rules/MagicCompRules_21031101.pdf','https://cdn.1j1ju.com/medias/d3/22/83-monopoly-rulebook.pdf','https://fgbradleys.com/wp-content/uploads/rules/Monopoly_Rules.pdf?srsltid=AfmBOorDaiGKyaEWIQFd-au0rl8-tKoqedlzy_6r4EETpj_ZMIUYsNMQ']
    process_and_store_documents(docs,'800','80', persist_directory="./faiss_index")
    faiss_db = FAISS.load_local("./faiss_index", EMBEDDING_MODEL, allow_dangerous_deserialization=True)
    load_dotenv()
    GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

    if not GEMINI_API_KEY:
        raise ValueError("GEMINI_API_KEY not found in environment variables")

    llm = GoogleGenerativeAI(
        api_key=GEMINI_API_KEY,
        model="gemini-2.0-flash",
        verbose=False
    )

    return faiss_db, llm

## 8. Prepare documents and vectorstore


In [11]:
# # Prepare documents
# docs = ['https://assets.pokemon.com/assets/cms2/pdf/trading-card-game/rulebook/sm7_rulebook_en.pdf','https://media.wizards.com/images/magic/tcg/resources/rules/MagicCompRules_21031101.pdf','https://cdn.1j1ju.com/medias/d3/22/83-monopoly-rulebook.pdf','https://fgbradleys.com/wp-content/uploads/rules/Monopoly_Rules.pdf?srsltid=AfmBOorDaiGKyaEWIQFd-au0rl8-tKoqedlzy_6r4EETpj_ZMIUYsNMQ']
# process_and_store_documents(docs,'800','80', persist_directory="./faiss_index")
# faiss_db = FAISS.load_local("./faiss_index", EMBEDDING_MODEL, allow_dangerous_deserialization=True)

In [12]:
# rag_chain, rag_memory = create_rag_chain(
#     vectorstore=faiss_db,
#     llm=llm,
#     prompt_type="react",
#     use_memory=True,
#     retriever_k=3,
#     use_reranking=True,
#     top_k_chunks=10
# )

## 9. Testing with Sample Queries
Let's test our RAG system with some sample queries.

In [13]:
# faiss_memory = create_memory_vectorstore(
#     embedding_model=EMBEDDING_MODEL, path="memory_vectorstore"
# )

# # Test some queries
# test_queries = [
#     "what is EX card?",
#     "what is haste?",
#     "how to build a hotel?"
# ]

# for query in test_queries:
#     print(f"\n🔍 Query: {query}")
#     result = rag_chain({"question": query})
#     print_result_summary(result)
#     store_to_memory_vectorstore(
#         question=query,
#         answer=result["answer"],
#         vectorstore=faiss_memory
#     )

## Rewritten prompt
Here we will use LLM to rewrite prompts itself for better answers - TO BE IMPLEMENTED

In [None]:
# test_queries = [
#     "what is EX card?",
#     "what is haste?",
#     "how to build a hotel?"
# ]
# for query in test_queries:
#     result = rag_chain({"question": query, "chat_history": []})
#     print_result_summary(result)
#     store_to_memory_vectorstore(
#         question=query,
#         answer=result["answer"],
#         vectorstore=faiss_memory,         
#         embedding_model=EMBEDDING_MODEL
#     )

## RAGAS EVALUATION

In [16]:
# from ragas.metrics import faithfulness, answer_relevancy, context_precision

# metrics = [faithfulness, answer_relevancy, context_precision]

In [17]:
# from ragas import evaluate
# from ragas.llms import LangchainLLMWrapper

# # Wrap the LLM
# evaluator_llm = LangchainLLMWrapper(llm)

# # Evaluate the dataset
# results = evaluate(dataset=eval_dataset, metrics=metrics, llm=evaluator_llm)

# # Display the results
# print(results)