In [None]:
import os
import json
import warnings
from pathlib import Path
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv

from langchain.chains import ConversationalRetrievalChain
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_openai import ChatOpenAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.schema import Document

# Suppress HuggingFace warnings for cleaner output
warnings.filterwarnings("ignore", category=UserWarning, module="huggingface_hub")

In [None]:
class Config:
    """Configuration management for the Smart Research Assistant."""

    def __init__(self):
        self._setup_environment()

    def _setup_environment(self):
        """Set up environment variables for LangChain and OpenAI."""

        load_dotenv()
        # Verify required keys
        missing_keys = []
        for key in ["OPENAI_API_KEY", "LANGSMITH_API_KEY"]:
            if not os.getenv(key):
                missing_keys.append(key)

        if missing_keys:
            raise EnvironmentError(f"Missing required environment variables: {', '.join(missing_keys)}")

        print("Environment successfully configured from .env")

In [4]:
class DocumentProcessor:
    """Handles document loading and text splitting."""

    def __init__(
        self,
        chunk_size: int = 500,
        chunk_overlap: int = 50
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )

    def load_and_split_documents(self, doc_paths: List[str]) -> List[Document]:
        """Loads PDF documents and split them into text chunks for processing."""
        print(f"Loading and splitting {len(doc_paths)} documents...")
        all_chunks = []

        for path in doc_paths:
            if not Path(path).exists():
                print(f"Warning: Document not found at {path}")
                continue

            print(f"Processing: {path}")
            loader = PyMuPDFLoader(path)
            documents = loader.load()
            chunks = self.text_splitter.split_documents(documents)
            all_chunks.extend(chunks)
            print(f"Generated {len(chunks)} chunks")

        print(f"Total chunks created: {len(all_chunks)}")
        return all_chunks

In [5]:
class VectorStoreManager:
    """Manages ChromaDB vector store operations."""

    def __init__(self, persist_dir: str = "../data/chroma_db",
                 embedding_model: str = "all-MiniLM-L6-v2"):
        self.persist_dir = persist_dir
        self.embedding_model = self._initialize_embeddings(embedding_model)
        self.db: Optional[Chroma] = None

    def _initialize_embeddings(self, model_name: str) -> HuggingFaceEmbeddings:
        """Initialize HuggingFace embeddings with Windows compatibility."""
        print(f"Loading embedding model: {model_name}")

        model_kwargs = {
            'device': 'cpu',
            'trust_remote_code': True
        }

        encode_kwargs = {
            'normalize_embeddings': True,
            'batch_size': 32
        }

        embeddings = HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs
        )
        print("✓ Embedding model loaded successfully")
        return embeddings

    def build_index(self, documents: list[Document]):
        """Create and persist vector index from document chunks."""
        print("Building ChromaDB index...")

        # Create the persist directory if it doesn't exist
        Path(self.persist_dir).mkdir(parents=True, exist_ok=True)

        self.db = Chroma.from_documents(
            documents,
            self.embedding_model,
            persist_directory=self.persist_dir
        )
        print(f"✓ Index built and saved to {self.persist_dir}")

    def load_index(self) -> None:
        """Load existing vector index from disk."""
        print("Loading existing ChromaDB index...")

        try:
            self.db = Chroma(
                persist_directory=self.persist_dir,
                embedding_function=self.embedding_model
            )

            print(f"✓ ChromaDB index loaded successfully from '{self.persist_dir}'")

        except Exception as e:
            print(f"Failed to load ChromaDB index: {e}")
            raise

    def get_retriever(self, search_kwargs: Dict[str, Any] = None):
        """Get retriever for similarity search."""
        if not self.db:
            raise ValueError("ChromaDB is not initialized. Call build_index() or load_index() first")
        if search_kwargs is None:
            search_kwargs={"k": 3}

        return self.db.as_retriever(search_kwargs=search_kwargs)

    def index_exists(self) -> bool:
        """Check if vector index exists on disk."""
        persist_path = Path(self.persist_dir)

        return persist_path.exists() and any(persist_path.iterdir())

In [6]:
class QAChainBuilder:
    """Builds and manages the conversational QA chain."""

    def __init__(
        self,
        temperature: float = 0,
        model_name: str = "gpt-4o-mini"
    ):
        self.temperature = temperature
        self.model_name = model_name

    def build_chain(self, retriever) -> ConversationalRetrievalChain:
        """Build conversational retrieval chain"""
        llm = ChatOpenAI(
            temperature=self.temperature,
            model_name=self.model_name)

        return ConversationalRetrievalChain.from_llm(
            llm=llm,
            retriever=retriever,
            return_source_documents=True,
            verbose=True
        )

In [7]:
class ResponseFormatter:
    """Formats responses with source citations."""

    @staticmethod
    def format_answer_with_sources(
        result: Dict[str, Any],
        max_source_length: int = 300) -> str:
        """Format answer with source document citations"""
        answer = result.get('answer', 'No answer provided')
        sources = result.get('source_documents', [])

        if not sources:
            return answer

        formatted_response = answer + "\n\n" + "="*50 + "\nSOURCES:\n" + "="*50

        for i, doc in enumerate(sources, 1):
            source_path = doc.metadata.get('source', 'Unknown source')
            source_page = doc.metadata.get('page', 'Unknown page')

            formatted_response += f"\n\nSource {i}: {Path(source_path).name} (Page: {source_page})"
            formatted_response += f"\n{doc.page_content[:max_source_length]}..."

        return formatted_response

In [8]:
class SmartResearchAssistant:
    """Main class that orchestrates the RAG pipeline."""

    def __init__(self, config: Config = None):
        self.config = config or Config()
        self.document_processor = DocumentProcessor()
        self.vector_store = VectorStoreManager()
        self.qa_builder = QAChainBuilder()
        self.formatter = ResponseFormatter()
        self.qa_chain = None
        self.chat_history = []

    def setup(self, document_paths: List[str]):
        """Set up research assistant with documents."""
        # Process documents
        chunks = self.document_processor.load_and_split_documents(document_paths)

        if not chunks:
            raise ValueError("No documents were successfully processed")

        # Setup vector store
        if self.vector_store.index_exists():
            self.vector_store.load_index()
        else:
            self.vector_store.build_index(chunks)

        # Build QA chain
        retriever = self.vector_store.get_retriever()
        self.qa_chain = self.qa_builder.build_chain(retriever)
        print("Smart Research Assistant is ready!")

    def ask_question(self, question: str) -> str:
        """Ask a question and get an answer with sources."""
        if not self.qa_chain:
            raise ValueError("Assistant not set up. Call setup() first.")

        print(f"\nProcessing question: {question}")

        result = self.qa_chain.invoke({
            "question": question,
            "chat_history": self.chat_history
        })

        # Update chat history
        self.chat_history.append((question, result['answer']))
        return self.formatter.format_answer_with_sources(result)

    def clear_history(self):
        """Clear chat history."""
        self.chat_history = []
        print("Chat history cleared")

In [None]:
load_dotenv()

# Initialize the assistant
config = Config()
assistant = SmartResearchAssistant(config)

# Document paths
document_paths = [
    "../data/docs/article1.pdf",
    "../data/docs/article2.pdf",
]

try:
    # Setup the assistant
    assistant.setup(document_paths)

    questions = [
        "How to handle Data Privacy?",
        "What are the steps for question answering from documents?"
    ]

    # Process questions
    for question in questions:
        try:
            answer = assistant.ask_question(question)
            print(f"\nQ: {question}")
            print(f"A: {answer}")
            print("\n" + "="*80)

        except Exception as e:
            print(f"Error processing question '{question}': {e}")

except Exception as e:
    print(f"Setup error: {e}")

Loading embedding model: all-MiniLM-L6-v2
✓ Embedding model loaded successfully
Loading and splitting 2 documents...
Processing: ../data/docs/article1.pdf
Generated 59 chunks
Processing: ../data/docs/article2.pdf
Generated 82 chunks
Total chunks created: 141
Building ChromaDB index...
✓ Index built and saved to ../data/chroma_db
Smart Research Assistant is ready!

Processing question: How to handle Data Privacy?


[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mSystem: Use the following pieces of context to answer the user's question. 
If you don't know the answer, just say that you don't know, don't try to make up an answer.
----------------
details are hidden before being sent to outside AI systems. Only approved people can see or change the data,
and every action is tracked. The whole system can run inside a private company network if needed, meeting
all required data protection laws.
Written by: Su

In [12]:
# Initialize the assistant
config = Config()
assistant = SmartResearchAssistant(config)

# Document paths
document_paths = [
    "../data/docs/article1.pdf",
    "../data/docs/article2.pdf",
]

try:
    # Setup the assistant
    assistant.setup(document_paths)

    questions = [
        "What are the main benefits of using Langchain systems?"
    ]

    # Process questions
    for question in questions:
        try:
            answer = assistant.ask_question(question)
            print(f"\nQ: {question}")
            print(f"A: {answer}")
            print("\n" + "="*80)

        except Exception as e:
            print(f"Error processing question '{question}': {e}")

except Exception as e:
    print(f"Setup error: {e}")

Loading embedding model: all-MiniLM-L6-v2
✓ Embedding model loaded successfully
Loading and splitting 2 documents...
Processing: ../data/docs/article1.pdf
Generated 59 chunks
Processing: ../data/docs/article2.pdf
Generated 82 chunks
Total chunks created: 141
Loading existing ChromaDB index...
✓ ChromaDB index loaded successfully from '../data/chroma_db'
Smart Research Assistant is ready!

Processing question: What are the main benefits of using Langchain systems?


[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mSystem: Use the following pieces of context to answer the user's question. 
If you don't know the answer, just say that you don't know, don't try to make up an answer.
----------------
1052 
 
with our own documents, as discussed in the 
following Question Answering from Documents 
section. 
A.A.3 Chains 
The most important key building block of 
LangChain is the chain. The chain usually 
combi