# Setting Up

In [1]:
# Standard library
import os
from pathlib import Path
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime

# Environment variables
from dotenv import load_dotenv

# LangChain core
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Document loaders
from langchain_community.document_loaders import (
    UnstructuredFileLoader,
    DirectoryLoader,
    WebBaseLoader,
)

# Vector store and embeddings
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings

# LLM
from langchain_google_genai import ChatGoogleGenerativeAI

# Re-ranking
from sentence_transformers import CrossEncoder

# Token counting
import tiktoken

# Load environment variables
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('IndustrialRAG')

USER_AGENT environment variable not set, consider setting it to identify your requests.


# Utility Functions

In [2]:
# Uses tiktoken (OpenAI's tokenizer) as a proxy for token estimation
def count_tokens(text: str, encoding_name: str = "cl100k_base") -> int:
    """
    Count the number of tokens in a text string.
    """
    try:
        encoding = tiktoken.get_encoding(encoding_name)
        tokens = encoding.encode(text)
        return len(tokens)
    except Exception as e:
        logger.error(f"Error counting tokens: {e}")
        # Fallback: rough estimation (1 token â‰ˆ 4 characters)
        return len(text) // 4

def analyze_document(text: str) -> Dict[str, Any]:
    """   
    Strategy:
    - Short docs (<2000 tokens): chunk_size=500, overlap=100
    - Medium docs (2000-10000 tokens): chunk_size=1000, overlap=200
    - Long docs (>10000 tokens): chunk_size=1500, overlap=300
    Returns: Dictionary with token count and suggested chunk parameters
    """
    token_count = count_tokens(text)
    word_count = len(text.split())
    char_count = len(text)
    
    # Determine optimal chunk size based on document length
    if token_count < 2000:
        chunk_size = 500
        chunk_overlap = 100
        strategy = "small"
    elif token_count < 10000:
        chunk_size = 1000
        chunk_overlap = 200
        strategy = "medium"
    else:
        chunk_size = 1500
        chunk_overlap = 300
        strategy = "large"
    
    analysis = {
        "token_count": token_count,
        "word_count": word_count,
        "char_count": char_count,
        "suggested_chunk_size": chunk_size,
        "suggested_overlap": chunk_overlap,
        "strategy": strategy
    }
    
    logger.info(f"Document analysis: {token_count} tokens, strategy={strategy}")
    
    return analysis

# Define RAG System Class

In [3]:
class RAGSystem:
    """
    Production-grade RAG system with indexing and query capabilities.
    Combines document loading, chunking, embedding, retrieval, re-ranking, and generation.
    """
    
    def __init__(self):
        """Initialize RAG system components."""
        self.documents = []
        self.chunks = []
        self.embeddings = None
        self.vectorstore = None
        self.retriever = None
        self.cross_encoder = None
        self.llm = None
        
        logger.info("RAGSystem initialized")

        
    
    # ==================== INDEXING PHASE ====================
    # Document loading function (required by RAGSystem)
    
    @staticmethod
    def _load(source: str) -> List[Document]:
        """Auto-detect and load file, directory, or web URL using LangChain loaders."""
        
        # Check web URL first
        if source.startswith(("http://", "https://")):
            loader = WebBaseLoader(source)
            return loader.load()
        
        # Check file/directory
        source_path = Path(source)
        
        if source_path.is_file():
            loader = UnstructuredFileLoader(str(source_path))
            return loader.load()
            
        elif source_path.is_dir():
            loader = DirectoryLoader(
                str(source_path),
                glob="**/*",
                loader_cls=UnstructuredFileLoader,
                use_multithreading=True
            )
            return loader.load()
    
        else:
            raise ValueError(f"Invalid source: {source}")
    
    def load_documents(self, source: str) -> None:
        """
        Load documents from file, directory, or URL.
        
        Args:
            source: Path to file/directory or URL
        """
        docs = RAGSystem._load(source)  # Call static method
        self.documents.extend(docs)
        logger.info(f"Loaded {len(docs)} documents from {source}")
        print(f"Loaded {len(docs)} documents. Total: {len(self.documents)}")
    
    def create_chunks(self, chunk_size: int = 1000, chunk_overlap: int = 200) -> None:
        """
        Split documents into chunks.
        
        Args:
            chunk_size: Size of each chunk
            chunk_overlap: Overlap between chunks
        """
        if not self.documents:
            raise ValueError("No documents loaded. Call load_documents() first.")
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ". ", "; ", " ", ""],
            is_separator_regex=False,
        )
        
        self.chunks = text_splitter.split_documents(self.documents)
        logger.info(f"Created {len(self.chunks)} chunks")
        print(f"Created {len(self.chunks)} chunks from {len(self.documents)} documents")
    
    def build_index(self) -> None:
        """
        Create embeddings and build vector store index.
        Initializes embeddings, cross-encoder, LLM, and vector store.
        """
        if not self.chunks:
            raise ValueError("No chunks available. Call create_chunks() first.")
        
        # Initialize embeddings
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={'device': 'mps'},
            encode_kwargs={'normalize_embeddings': True}
        )
        logger.info("Embeddings model loaded")
        
        # Create vector store
        self.vectorstore = Chroma.from_documents(
            documents=self.chunks,
            embedding=self.embeddings,
            collection_name="rag_knowledge_base"
        )
        logger.info(f"Vector store created with {len(self.chunks)} chunks")
        
        # Initialize retriever
        self.retriever = self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": 10}
        )
        logger.info("Retriever initialized")
        
        # Initialize cross-encoder for re-ranking
        self.cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        logger.info("Cross-encoder loaded")
        
        # Initialize LLM
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-2.5-flash",
            google_api_key=GOOGLE_API_KEY,
            temperature=0.3,
            max_tokens=1024,
        )
        logger.info("LLM initialized")
        
        print("Index built successfully!")
        print(f"  - {len(self.chunks)} chunks indexed")
        print(f"  - Embeddings: all-MiniLM-L6-v2")
        print(f"  - Cross-encoder: ms-marco-MiniLM-L-6-v2")
        print(f"  - LLM: gemini-2.5-flash")


        
    # ==================== QUERY PHASE ====================
    
    def retrieve(self, query: str, k: int = 10) -> List[Document]:
        """
        Retrieve top-k candidate documents.
        
        Args:
            query: User question
            k: Number of documents to retrieve
            
        Returns:
            List of retrieved documents
        """
        if not self.retriever:
            raise ValueError("Index not built. Call build_index() first.")
        
        results = self.retriever.invoke(query)
        logger.info(f"Retrieved {len(results)} candidates")
        return results
    
    def rerank(self, query: str, documents: List[Document], top_k: int = 3) -> List[Document]:
        """
        Re-rank documents using cross-encoder.
        
        Args:
            query: User question
            documents: List of candidate documents
            top_k: Number of top documents to return
            
        Returns:
            Top-k re-ranked documents
        """
        if not self.cross_encoder:
            raise ValueError("Cross-encoder not initialized. Call build_index() first.")
        
        pairs = [[query, doc.page_content] for doc in documents]
        scores = self.cross_encoder.predict(pairs)
        
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        top_docs = [doc for doc, score in scored_docs[:top_k]]
        logger.info(f"Re-ranked to top {top_k} documents")
        
        return top_docs
    
    def generate(self, query: str, context_docs: List[Document]) -> str:
        """
        Generate answer using LLM with retrieved context.
        
        Args:
            query: User question
            context_docs: Retrieved and re-ranked documents
            
        Returns:
            Generated answer
        """
        if not self.llm:
            raise ValueError("LLM not initialized. Call build_index() first.")
        
        # Format context
        context = "\n\n".join([
            f"[Document {i+1}]\n{doc.page_content}" 
            for i, doc in enumerate(context_docs)
        ])
        
        # Create prompt
        prompt_template = """You are a helpful AI assistant. Answer the question based on the provided context.

Context:
{context}

Question: {question}

Instructions:
- Answer based ONLY on the information in the context
- If the context doesn't contain enough information, say "I don't have enough information to answer that."
- Be concise and accurate
- Use natural language

Answer:"""
        
        prompt = ChatPromptTemplate.from_template(prompt_template)
        messages = prompt.format_messages(context=context, question=query)
        
        # Generate answer
        response = self.llm.invoke(messages)
        logger.info("Answer generated")
        
        return response.content
    
    def query(self, question: str, return_sources: bool = False) -> str | Dict[str, Any]:
        """
        Complete RAG pipeline: retrieve -> re-rank -> generate.
        This is the main public API for querying the system.
        
        Args:
            question: User question
            return_sources: If True, return dict with answer and source documents
            
        Returns:
            Generated answer (string) or dict with answer and sources
        """
        if not self.retriever:
            raise ValueError("System not ready. Call build_index() first.")
        
        logger.info(f"Query: {question}")
        
        # Step 1: Retrieve candidates
        candidates = self.retrieve(question, k=10)
        
        # Step 2: Re-rank to top 3
        top_docs = self.rerank(question, candidates, top_k=3)
        
        # Step 3: Generate answer
        answer = self.generate(question, top_docs)
        
        if return_sources:
            return {
                "answer": answer,
                "sources": top_docs
            }
        else:
            return answer


        
    # ==================== UTILITY METHODS ====================
    
    def get_stats(self) -> Dict[str, Any]:
        """Get system statistics."""
        return {
            "total_documents": len(self.documents),
            "total_chunks": len(self.chunks),
            "index_built": self.vectorstore is not None,
        }

print("RAGSystem class defined")

RAGSystem class defined


# Initialize and Build RAG System
- Create RAG system instance
- Create chunks
- Build index (Embeddings, vector store, retriever and LLM)

In [5]:
# Create RAG system instance
rag = RAGSystem()

# Load documents
rag.load_documents("test_documents")
rag.load_documents("https://karpathy.ai/")

# Create chunks (using adaptive sizing based on total content)
combined_text = "\n\n".join([doc.page_content for doc in rag.documents])
analysis = analyze_document(combined_text)

rag.create_chunks(
    chunk_size=analysis['suggested_chunk_size'],
    chunk_overlap=analysis['suggested_overlap']
)

# Build index (embeddings, vector store, retriever, cross-encoder, LLM)
rag.build_index()

print("\nRAG System ready for queries!")

2026-02-09 09:58:33 - IndustrialRAG - INFO - RAGSystem initialized
2026-02-09 09:58:34 - IndustrialRAG - INFO - Loaded 3 documents from test_documents
2026-02-09 09:58:34 - IndustrialRAG - INFO - Loaded 1 documents from https://karpathy.ai/
2026-02-09 09:58:34 - IndustrialRAG - INFO - Document analysis: 21618 tokens, strategy=large
2026-02-09 09:58:34 - IndustrialRAG - INFO - Created 80 chunks
2026-02-09 09:58:34 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: sentence-transformers/all-MiniLM-L6-v2


Loaded 3 documents. Total: 3
Loaded 1 documents. Total: 4
Created 80 chunks from 4 documents


2026-02-09 09:58:34 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2/resolve/main/modules.json "HTTP/1.1 307 Temporary Redirect"
2026-02-09 09:58:34 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/api/resolve-cache/models/sentence-transformers/all-MiniLM-L6-v2/c9745ed1d9f207416be6d2e6f8de32d1f16199bf/modules.json "HTTP/1.1 200 OK"
2026-02-09 09:58:35 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2/resolve/main/config_sentence_transformers.json "HTTP/1.1 307 Temporary Redirect"
2026-02-09 09:58:35 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/api/resolve-cache/models/sentence-transformers/all-MiniLM-L6-v2/c9745ed1d9f207416be6d2e6f8de32d1f16199bf/config_sentence_transformers.json "HTTP/1.1 200 OK"
2026-02-09 09:58:35 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2/resolve/main/config_sentence_transformers.json "HTTP/1

Loading weights:   0%|          | 0/103 [00:00<?, ?it/s]

[1mBertModel LOAD REPORT[0m from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m
2026-02-09 09:58:36 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2/resolve/main/config.json "HTTP/1.1 307 Temporary Redirect"
2026-02-09 09:58:36 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/api/resolve-cache/models/sentence-transformers/all-MiniLM-L6-v2/c9745ed1d9f207416be6d2e6f8de32d1f16199bf/config.json "HTTP/1.1 200 OK"
2026-02-09 09:58:37 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2/resolve/main/tokenizer_config.json "HTTP/1.1 307 Temporary Redirect"
2026-02-09 09:58:37 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/api/re

Loading weights:   0%|          | 0/105 [00:00<?, ?it/s]

[1mBertForSequenceClassification LOAD REPORT[0m from: cross-encoder/ms-marco-MiniLM-L-6-v2
Key                          | Status     |  | 
-----------------------------+------------+--+-
bert.embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m
2026-02-09 09:58:41 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/cross-encoder/ms-marco-MiniLM-L-6-v2/resolve/main/config.json "HTTP/1.1 307 Temporary Redirect"
2026-02-09 09:58:41 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/cross-encoder/ms-marco-MiniLM-L6-v2/resolve/main/config.json "HTTP/1.1 307 Temporary Redirect"
2026-02-09 09:58:41 - httpx - INFO - HTTP Request: HEAD https://huggingface.co/api/resolve-cache/models/cross-encoder/ms-marco-MiniLM-L6-v2/c5ee24cb16019beea0893ab7796b1df96625c6b8/config.json "HTTP/1.1 200 OK"
2026-02-09 09:58:41 - httpx - INFO - HTTP Request: HEAD https://huggin

Index built successfully!
  - 80 chunks indexed
  - Embeddings: all-MiniLM-L6-v2
  - Cross-encoder: ms-marco-MiniLM-L-6-v2
  - LLM: gemini-2.5-flash

RAG System ready for queries!


# Test RAG System

In [6]:
test_questions = [
    "How does retrieval augmented generation work?",
    "What is Zara Zhang's email address?"
]

for question in test_questions:
    print("="*80)
    print(f"Question: {question}")
    print("-"*80)
    
    result = rag.query(question, return_sources=True)
    
    print(f"\nAnswer:\n{result['answer']}")
    print(f"\nSources used:")
    
    for i, doc in enumerate(result['sources'], 1):
        source = doc.metadata.get('source', 'Unknown')
        print(f"  {i}. {Path(source).name if not source.startswith('http') else source}")
    
    print("="*80)
    print("\n\n")

2026-02-09 09:58:56 - IndustrialRAG - INFO - Query: How does retrieval augmented generation work?
2026-02-09 09:58:56 - IndustrialRAG - INFO - Retrieved 10 candidates


Question: How does retrieval augmented generation work?
--------------------------------------------------------------------------------


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

2026-02-09 09:58:57 - IndustrialRAG - INFO - Re-ranked to top 3 documents
2026-02-09 09:58:57 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2026-02-09 09:59:00 - httpx - INFO - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2026-02-09 09:59:00 - IndustrialRAG - INFO - Answer generated
2026-02-09 09:59:00 - IndustrialRAG - INFO - Query: What is Zara Zhang's email address?
2026-02-09 09:59:00 - IndustrialRAG - INFO - Retrieved 10 candidates



Answer:
Retrieval-Augmented Generation (RAG) works through several key steps:
1.  **Document Ingestion**: Documents are loaded and processed.
2.  **Chunking**: Large documents are split into smaller chunks.
3.  **Embedding Generation**: Each chunk is converted into a dense vector representation using embedding models.
4.  **Vector Storage**: Embeddings are stored in a vector database.
5.  **Query Processing**: A user's question is converted into an embedding.
6.  **Retrieval**: The system finds the most relevant chunks from the vector database using similarity search.
7.  **Context Augmentation**: Retrieved chunks are added to the prompt as context.
8.  **Generation**: A large language model (LLM) generates a response based on the question and the retrieved context.

Sources used:
  1. rag_introduction.txt
  2. rag-for-knowledge-intensive-nlp-tasks.pdf
  3. rag-for-knowledge-intensive-nlp-tasks.pdf



Question: What is Zara Zhang's email address?
--------------------------------------

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

2026-02-09 09:59:00 - IndustrialRAG - INFO - Re-ranked to top 3 documents
2026-02-09 09:59:00 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2026-02-09 09:59:02 - httpx - INFO - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2026-02-09 09:59:02 - IndustrialRAG - INFO - Answer generated



Answer:
Zara Zhang's email address is zara.r.zhang@gmail.com.

Sources used:
  1. About-Zara-Zhang.docx
  2. rag-for-knowledge-intensive-nlp-tasks.pdf
  3. rag-for-knowledge-intensive-nlp-tasks.pdf



