In [1]:
!pip3 install --quiet langchain langchain_community langchain_core langchain-google-genai pypdf chromadb
!pip install -U --quiet langchain_chroma

In [2]:
# -*- coding: utf-8 -*-
"""
Professional RAG (Retrieval-Augmented Generation) System
A document processing and question-answering system using Google's Gemini models.

Dependencies:
    pip install langchain langchain_community langchain_core langchain-google-genai
    pip install langchain_chroma pypdf chromadb
"""

import os
import time
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from pathlib import Path

from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain.docstore.document import Document

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


@dataclass
class RAGConfig:
    """Configuration class for RAG system parameters."""
    file_path: str
    api_key: str
    model_name: str = "gemini-2.5-flash-lite"
    embedding_model: str = "models/embedding-001"
    chunk_size: int = 500
    chunk_overlap: int = 150
    temperature: float = 0.0
    retrieval_k: int = 3
    score_threshold: float = 0.3  # Increased from 0.01 to be more selective
    metadata_extraction_delay: float = 2.0
    persist_directory: str = "./chroma_db"
    min_page_words: int = 20
    adaptive_threshold: bool = True  # Enable adaptive thresholding
    min_relevant_docs: int = 1  # Minimum number of relevant docs needed


class DocumentProcessor:
    """Handles document loading and preprocessing."""

    def __init__(self, config: RAGConfig):
        self.config = config

    def load_and_clean_documents(self) -> List[Document]:
        """Load PDF and filter out pages with insufficient content."""
        try:
            if not Path(self.config.file_path).exists():
                raise FileNotFoundError(f"File not found: {self.config.file_path}")

            loader = PyPDFLoader(self.config.file_path)
            pages = loader.load()

            cleaned_pages = [
                page for page in pages
                if len(page.page_content.split()) > self.config.min_page_words
            ]

            logger.info(f"Loaded {len(pages)} pages, kept {len(cleaned_pages)} after cleaning")
            return cleaned_pages

        except Exception as e:
            logger.error(f"Error loading documents: {e}")
            raise


class MetadataExtractor:
    """Handles metadata extraction from documents."""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.llm = ChatGoogleGenerativeAI(
            model=config.model_name,
            temperature=config.temperature,
            google_api_key=config.api_key,
        )
        self.schema = self._create_extraction_schema()
        self.extraction_chain = self.llm.with_structured_output(self.schema)

    def _create_extraction_schema(self) -> Dict[str, Any]:
        """Define the schema for metadata extraction."""
        return {
            "name": "extract_metadata",
            "description": "Extract metadata from text",
            "parameters": {
                "type": "object",
                "properties": {
                    "title": {"type": "string"},
                    "keywords": {"type": "array", "items": {"type": "string"}},
                    "summary": {"type": "string"},
                },
                "required": ["title", "keywords", "summary"],
            },
        }

    def extract_metadata_batch(self, documents: List[Document]) -> List[Document]:
        """Extract metadata for a batch of documents with rate limiting."""
        docs_with_metadata = []

        for i, doc in enumerate(documents):
            try:
                logger.info(f"Processing document {i+1}/{len(documents)}")
                extracted_data = self.extraction_chain.invoke(doc.page_content)

                if extracted_data:
                    keywords = extracted_data.get("keywords", [])
                    if isinstance(keywords, list):
                        keywords = ", ".join(keywords)

                    doc.metadata.update({
                        "title": extracted_data.get("title", "Unknown"),
                        "keywords": keywords,
                        "summary": extracted_data.get("summary", ""),
                        "processed_at": time.strftime("%Y-%m-%d %H:%M:%S")
                    })

            except Exception as e:
                logger.error(f"Error extracting metadata for document {i+1}: {e}")
                doc.metadata.update({
                    "title": "Unknown",
                    "keywords": "",
                    "summary": "",
                    "error": str(e),
                    "processed_at": time.strftime("%Y-%m-%d %H:%M:%S")
                })

            docs_with_metadata.append(doc)

            # Rate limiting
            if i < len(documents) - 1:  # Don't sleep after the last document
                time.sleep(self.config.metadata_extraction_delay)

        return docs_with_metadata


class VectorStore:
    """Manages vector storage and retrieval operations."""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.embeddings = GoogleGenerativeAIEmbeddings(
            google_api_key=config.api_key,
            model=config.embedding_model
        )
        self.store = None

    def create_or_load_store(self, documents: List[Document]) -> None:
        """Create new vector store or load existing one."""
        try:
            if Path(self.config.persist_directory).exists():
                logger.info("Loading existing vector store...")
                self.store = Chroma(
                    persist_directory=self.config.persist_directory,
                    embedding_function=self.embeddings
                )
            else:
                logger.info("Creating new vector store...")
                self.store = Chroma.from_documents(
                    documents,
                    self.embeddings,
                    persist_directory=self.config.persist_directory
                )

        except Exception as e:
            logger.error(f"Error with vector store: {e}")
            raise

    def search_similar(self, query: str, k: Optional[int] = None) -> List[Document]:
        """Search for similar documents with adaptive filtering."""
        if not self.store:
            raise ValueError("Vector store not initialized")

        # Get more documents initially for filtering
        search_k = (k or self.config.retrieval_k) * 2

        retriever = self.store.as_retriever(
            search_type="similarity_score_threshold",
            search_kwargs={
                "k": search_k,
                "score_threshold": self.config.score_threshold,
            },
        )

        initial_results = retriever.invoke(query)

        if not initial_results:
            return []

        # If adaptive thresholding is enabled, apply additional filtering
        if self.config.adaptive_threshold:
            filtered_results = self._apply_adaptive_filtering(query, initial_results)
            return filtered_results[:(k or self.config.retrieval_k)]

        return initial_results[:(k or self.config.retrieval_k)]

    def _apply_adaptive_filtering(self, query: str, documents: List[Document]) -> List[Document]:
        """Apply adaptive filtering to remove irrelevant documents."""
        if not documents:
            return []

        # Get similarity scores (if available from metadata)
        # Note: Chroma doesn't directly expose scores, so we'll use content-based filtering

        query_words = set(query.lower().split())
        filtered_docs = []

        for doc in documents:
            doc_words = set(doc.page_content.lower().split())

            # Calculate word overlap ratio
            overlap = len(query_words.intersection(doc_words))
            overlap_ratio = overlap / len(query_words) if query_words else 0

            # Keep documents with reasonable word overlap or keyword matches
            if (overlap_ratio > 0.1 or  # At least 10% word overlap
                self._has_keyword_match(query, doc) or  # Keyword match in metadata
                self._has_semantic_relevance(query, doc)):  # Basic semantic check
                filtered_docs.append(doc)

        return filtered_docs

    def _has_keyword_match(self, query: str, document: Document) -> bool:
        """Check if query matches document keywords."""
        keywords = document.metadata.get("keywords", "").lower()
        query_lower = query.lower()

        if not keywords:
            return False

        # Check if any query words match keywords
        query_words = query_lower.split()
        keyword_words = keywords.split(", ")

        return any(qword in keywords for qword in query_words)

    def _has_semantic_relevance(self, query: str, document: Document) -> bool:
        """Basic semantic relevance check."""
        query_lower = query.lower()
        content_lower = document.page_content.lower()

        # Look for semantic clusters of related words
        education_terms = ["university", "college", "student", "academic", "education", "course", "degree"]
        legal_terms = ["conviction", "criminal", "legal", "court", "law", "offense"]
        policy_terms = ["policy", "procedure", "rule", "regulation", "guideline"]

        query_in_education = any(term in query_lower for term in education_terms)
        query_in_legal = any(term in query_lower for term in legal_terms)
        query_in_policy = any(term in query_lower for term in policy_terms)

        content_has_education = any(term in content_lower for term in education_terms)
        content_has_legal = any(term in content_lower for term in legal_terms)
        content_has_policy = any(term in content_lower for term in policy_terms)

        # Return True if query and content share semantic domain
        return ((query_in_education and content_has_education) or
                (query_in_legal and content_has_legal) or
                (query_in_policy and content_has_policy))


class RAGSystem:
    """Main RAG system orchestrating all components."""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.doc_processor = DocumentProcessor(config)
        self.metadata_extractor = MetadataExtractor(config)
        self.vector_store = VectorStore(config)
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=config.chunk_size,
            chunk_overlap=config.chunk_overlap
        )
        self.llm = ChatGoogleGenerativeAI(
            google_api_key=config.api_key,
            temperature=config.temperature,
            model=config.model_name
        )
        self.qa_template = self._create_qa_template()

    def _create_qa_template(self) -> PromptTemplate:
        """Create the question-answering prompt template."""
        template = """
        Use the following pieces of context to answer the question at the end.
        Answer based on the provided context. If the context contains relevant information, provide a helpful answer.
        If the context doesn't contain enough relevant information to answer the question properly,
        indicate that you cannot provide a complete answer based on the available information.
        Do not make up answers or provide information not present in the context.

        Context:
        {context}

        Question: {question}

        Answer:
        """
        return PromptTemplate.from_template(template)

    def initialize_system(self, force_reprocess: bool = False) -> None:
        """Initialize the RAG system by processing documents and creating vector store."""
        try:
            # Load and clean documents
            documents = self.doc_processor.load_and_clean_documents()

            # Check if we need to reprocess
            if force_reprocess or not Path(self.config.persist_directory).exists():
                # Extract metadata
                logger.info("Extracting metadata...")
                docs_with_metadata = self.metadata_extractor.extract_metadata_batch(documents)

                # Split documents
                logger.info("Splitting documents...")
                split_docs = self.text_splitter.split_documents(docs_with_metadata)

                # Create vector store
                logger.info("Creating vector store...")
                self.vector_store.create_or_load_store(split_docs)
            else:
                # Load existing vector store
                self.vector_store.create_or_load_store([])

            logger.info("RAG system initialized successfully!")

        except Exception as e:
            logger.error(f"Failed to initialize RAG system: {e}")
            raise

    def _is_insufficient_answer(self, answer: str) -> bool:
        """Check if the answer indicates insufficient information."""
        insufficient_indicators = [
            "don't know",
            "do not know",
            "cannot answer",
            "can't answer",
            "cannot provide",
            "can't provide",
            "no information",
            "not enough information",
            "insufficient information",
            "unable to answer",
            "not available",
            "not found in",
            "not mentioned",
            "not provided",
            "cannot determine",
            "can't determine",
            "does not contain",
            "doesn't contain",
            "no relevant information",
            "not relevant",
            "cannot find",
            "can't find",
            "not specified",
            "not clear from",
            "unable to determine"
        ]

        answer_lower = answer.lower().strip()

        # Check if answer is very short (likely insufficient)
        if len(answer.split()) < 5:
            return True

        # Check for insufficient indicators
        for indicator in insufficient_indicators:
            if indicator in answer_lower:
                return True

        # Check if answer starts with uncertainty phrases
        uncertainty_starts = [
            "i cannot",
            "i can't",
            "i don't",
            "i do not",
            "there is no",
            "there isn't",
            "the context does not",
            "the context doesn't"
        ]

        for start in uncertainty_starts:
            if answer_lower.startswith(start):
                return True

        return False

    def query_documents(self, query: str, return_sources: bool = False) -> Dict[str, Any]:
        """Query the document collection and return structured results."""
        try:
            # Retrieve similar documents
            retrieved_docs = self.vector_store.search_similar(query)

            # Check if we have enough relevant documents
            if not retrieved_docs or len(retrieved_docs) < self.config.min_relevant_docs:
                return {
                    "answer": "I don't have enough relevant information to answer this question based on the available documents.",
                    "sources": [],
                    "confidence": "low",
                    "retrieved_docs_count": len(retrieved_docs) if retrieved_docs else 0
                }

            # Create RAG chain
            retrieve = {
                "context": lambda x: "\n\n".join([doc.page_content for doc in retrieved_docs]),
                "question": RunnablePassthrough()
            }

            rag_chain = (
                retrieve
                | self.qa_template
                | self.llm
                | StrOutputParser()
            )

            # Get answer
            answer = rag_chain.invoke(query)

            # Check if answer indicates insufficient information
            is_insufficient = self._is_insufficient_answer(answer)

            result = {
                "answer": answer,
                "confidence": "low" if is_insufficient else ("high" if len(retrieved_docs) >= 2 else "medium"),
                "retrieved_docs_count": len(retrieved_docs)
            }

            # Only include sources if we have a meaningful answer and sources are requested
            if return_sources and not is_insufficient:
                result["sources"] = [
                    {
                        "title": doc.metadata.get("title", "Unknown"),
                        "keywords": doc.metadata.get("keywords", ""),
                        "page": doc.metadata.get("page", "Unknown")
                    }
                    for doc in retrieved_docs
                ]
            else:
                result["sources"] = []

            return result

        except Exception as e:
            logger.error(f"Error querying documents: {e}")
            return {
                "answer": f"An error occurred while processing your query: {str(e)}",
                "sources": [],
                "confidence": "error",
                "retrieved_docs_count": 0
            }

    def batch_query(self, queries: List[str]) -> List[Dict[str, Any]]:
        """Process multiple queries and return results."""
        results = []
        for query in queries:
            logger.info(f"Processing query: {query}")
            result = self.query_documents(query, return_sources=True)
            results.append({
                "query": query,
                **result
            })
        return results

In [3]:
"""Demonstrate the RAG system."""
# Configuration
try:
    # Try to import keys, fallback to environment variables
    try:
        import keys
        config = RAGConfig(
            file_path=keys.FILE_NAME,
            api_key=keys.LLM_API_KEY
        )
    except ImportError:
        config = RAGConfig(
            file_path=os.getenv("FILE_NAME", "document.pdf"),
            api_key=os.getenv("GOOGLE_API_KEY")
        )

    if not config.api_key:
        raise ValueError("Google API key not found. Set GOOGLE_API_KEY environment variable or create keys.py")

    # Initialize RAG system
    rag = RAGSystem(config)
    rag.initialize_system()

    # Example queries
    test_queries = [
        "What do I need to be eligible?",
        "How can I show language proficiency?",
        "What are the ingredients of Carbonara?",
        "What is the additional contribution of a person with ISEE of 30000?"
    ]

    # Process queries
    results = rag.batch_query(test_queries)

    # Display results
    for result in results:
        print(f"\n{'='*60}")
        print(f"Question: {result['query']}")
        print(f"Confidence: {result['confidence']}")
        print(f"Answer: {result['answer']}")
        if result.get('sources'):
            print("\nSources:")
            for source in result['sources']:
                print(f"  - {source['title']} (Page: {source['page']})")

except Exception as e:
    logger.error(f"Application error: {e}")
    raise


Question: What do I need to be eligible?
Confidence: high
Answer: Based on the provided context, to be eligible, you need to meet the following requirements:

*   **General Requirements for Eligibility** (ART. 5.1)
*   **Language Requirements** (ART. 5.3)
*   **Other Requirements** (ART. 5.4)

Sources:
  - Student Mobility Regulations (Page: 1)
  - Student Mobility Regulations (Page: 1)
  - Academic Score and Ranking Criteria for Master’s in Civil Engineering (Page: 13)

Question: How can I show language proficiency?
Confidence: high
Answer: You can show language proficiency through a language proficiency test. These tests are multistage and include multiple-choice questions, cloze questions, and drag-and-drop activities within a text. The tests primarily focus on grammar and text comprehension, and do not include speaking or listening components. The test starts at the A2 level and progresses.

Sources:
  - Language Proficiency Requirements and Test Exemptions (Page: 8)
  - Organizat