# CTSE RAG Chatbot
This notebook implements the Retrieval-Augmented Generation (RAG) chatbot for CTSE lecture notes. The implementation will be built layer by layer, starting with imports and the document processor.overall design follows OOP Approach

## Step 1: Import Required Libraries
We start by importing all the necessary libraries for the RAG chatbot.

In [86]:
# Import Required Libraries
import os
import sys
import logging
import json
import time
import random
from pathlib import Path
from typing import List, Dict, Any, Optional, Callable
import requests

# Document processing libraries
import pypdf
from pptx import Presentation

# Vector database and embedding libraries
import chromadb
from chromadb.utils import embedding_functions

# Haystack libraries for retrieval
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.dataclasses import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore

# Terminal UI libraries
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.prompt import Prompt, Confirm
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.layout import Layout
from rich.live import Live
from rich.align import Align
from rich.rule import Rule
from rich.syntax import Syntax
from rich import box, print

# Data processing libraries
import numpy as np
import pandas as pd

# Environment variable management
from dotenv import load_dotenv
load_dotenv()

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

## Step 2: Core Configuration and Settings
This step defines the configuration settings for the chatbot, including constants, environment variables, and reusable templates.

In [87]:
# Disable Haystack telemetry
os.environ['HAYSTACK_TELEMETRY_ENABLED'] = 'False'

# Paths configuration
DATA_DIR = os.environ.get('RAGCHITCHAT_DATA_DIR', 'data') #im using path in root dir
PROCESSED_DIR = os.environ.get('RAGCHITCHAT_PROCESSED_DIR', 'processed')
DB_DIR = os.environ.get('RAGCHITCHAT_DB_DIR', 'chroma_db')

# Ollama configuration
OLLAMA_URL = os.environ.get('OLLAMA_URL', 'http://localhost:11434')
DEFAULT_MODEL = os.environ.get('RAGCHITCHAT_MODEL', 'mistral:7b-instruct-v0.3-q4_1')

# Document processing
CHUNK_SIZE = int(os.environ.get('RAGCHITCHAT_CHUNK_SIZE', '1000'))
CHUNK_OVERLAP = int(os.environ.get('RAGCHITCHAT_CHUNK_OVERLAP', '200'))

# Retrieval configuration
TOP_K_RESULTS = int(os.environ.get('RAGCHITCHAT_TOP_K', '5'))

# UI Configuration
HISTORY_CAPACITY = int(os.environ.get('RAGCHITCHAT_HISTORY_CAPACITY', '10'))

# Advanced System prompt for Ollama with enhanced instructions
SYSTEM_PROMPT = """
You are 'CTSE Scholar', an educational assistant specialized in Current Trends in Software Engineering (CTSE).
You have been carefully trained on university-level lecture notes from the CTSE course.

YOUR CAPABILITIES:
- Explain complex software engineering concepts with academic precision
- Provide examples relevant to modern software development practices
- Connect theoretical concepts to practical applications in the industry
- Analyze the evolution and future directions of software engineering methodologies

YOUR LIMITATIONS:
- You only possess knowledge contained in the CTSE lecture notes
- You cannot access real-time information beyond your training data
- You should acknowledge when information is not available in your knowledge base

RESPONSE GUIDELINES:
- Begin with a direct, concise answer to the question
- Structure longer responses with appropriate headings and bullet points
- Use academic terminology while remaining accessible to students
- Cite specific lectures or sections when possible (e.g., "According to Lecture 3 on DevOps...")
- When answering coding questions, ensure proper formatting and comments
- Include relevant examples to illustrate concepts
- For complex topics, break down explanations into sequential logical steps

If you don't have enough information to provide a complete answer, clearly acknowledge this limitation and suggest related topics you can address instead.
"""

# Advanced RAG prompt template with chain-of-thought reasoning
RAG_PROMPT_TEMPLATE = """
You are a university-level educational assistant specialized in Current Trends in Software Engineering.
You will receive context information extracted from CTSE lecture notes and a question from a student.

CONTEXT INFORMATION:
{context}

USER QUESTION:
{question}

To answer effectively:
1. First, carefully analyze what the question is asking.
2. Identify which parts of the context are most relevant to the question.
3. Think step-by-step about how these concepts should be explained.
4. Consider any potential misconceptions the student might have.
5. Formulate a clear, structured response that directly addresses the question.

Your answer should:
- Start with a direct response to the question
- Use academic language appropriate for university-level education
- Include specific examples when helpful
- Use markdown formatting for clarity (headings, bullet points, code blocks)
- Cite specific lecture content when possible
- Be factually accurate based only on the provided context

If the provided context doesn't contain sufficient information to answer the question completely:
1. Clearly state what information is not available in your knowledge base
2. Provide what partial information you can based on the available context
3. Suggest related topics you can address based on the available lecture notes

Response format:
---
## [Direct Answer to Question]
[Detailed explanation with structured formatting]

[Examples or elaboration as needed]

[If applicable: "Note: The lecture notes do not provide complete information about X, but I can tell you that..."]
---
"""

## Step 3: Document Processing Layer
This step implements the `DocumentProcessor` base class and its subclasses (`PDFProcessor`, `PPTXProcessor`) for handling lecture notes in PDF and PPTX formats. It also includes the `DocumentProcessorFactory` for selecting the appropriate processor.

In [88]:
# Document Processing Layer
from abc import ABC, abstractmethod
class DocumentProcessor(ABC):
    """Base class for document processors"""
    
    @abstractmethod
    def process(self, file_path: str) -> List[Dict[str, Any]]:
        """Process a document and return chunks of text with metadata"""
        pass
    
    def save_chunks(self, chunks: List[Dict[str, Any]], output_dir: str, filename: str) -> None:
        """Save processed chunks to a text file"""
        os.makedirs(output_dir, exist_ok=True)
        output_path = os.path.join(output_dir, filename)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            for chunk in chunks:
                f.write(f"--- Chunk from {chunk['metadata']['source']} (Page/Slide {chunk['metadata'].get('page_num', 'N/A')}) ---\n")
                f.write(chunk['content'])
                f.write("\n\n")
        
        logger.info(f"Saved processed content to {output_path}")

class PDFProcessor(DocumentProcessor):
    """Processor for PDF documents"""
    
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def process(self, file_path: str) -> List[Dict[str, Any]]:
        logger.info(f"Processing PDF: {file_path}")
        chunks = []
        
        try:
            with open(file_path, 'rb') as file:
                pdf = pypdf.PdfReader(file)
                
                for i, page in enumerate(pdf.pages):
                    text = page.extract_text()
                    if text.strip():
                        if len(text) <= self.chunk_size:
                            chunks.append({
                                'content': text,
                                'metadata': {
                                    'source': os.path.basename(file_path),
                                    'page_num': i + 1
                                }
                            })
                        else:
                            start = 0
                            while start < len(text):
                                end = min(start + self.chunk_size, len(text))
                                chunk_text = text[start:end]
                                chunks.append({
                                    'content': chunk_text,
                                    'metadata': {
                                        'source': os.path.basename(file_path),
                                        'page_num': i + 1,
                                        'chunk_part': f"{start}-{end}"
                                    }
                                })
                                start += self.chunk_size - self.chunk_overlap
                                if start >= len(text):
                                    break
            
            logger.info(f"Extracted {len(chunks)} chunks from {file_path}")
            return chunks
            
        except Exception as e:
            logger.error(f"Error processing PDF {file_path}: {str(e)}")
            return []

class PPTXProcessor(DocumentProcessor):
    """Processor for PowerPoint documents"""
    
    def process(self, file_path: str) -> List[Dict[str, Any]]:
        logger.info(f"Processing PPTX: {file_path}")
        chunks = []
        
        try:
            prs = Presentation(file_path)
            
            for i, slide in enumerate(prs.slides):
                slide_text = []
                
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        text = shape.text.strip()
                        if text:
                            slide_text.append(text)
                
                if slide_text:
                    chunks.append({
                        'content': '\n'.join(slide_text),
                        'metadata': {
                            'source': os.path.basename(file_path),
                            'page_num': i + 1
                        }
                    })
            
            logger.info(f"Extracted {len(chunks)} chunks from {file_path}")
            return chunks
            
        except Exception as e:
            logger.error(f"Error processing PPTX {file_path}: {str(e)}")
            return []

class DocumentProcessorFactory:
    """Factory for creating document processors based on file extension"""
    
    @staticmethod
    def get_processor(file_path: str) -> Optional[DocumentProcessor]:
        """Get appropriate document processor based on file extension"""
        ext = Path(file_path).suffix.lower()
        
        if ext == '.pdf':
            return PDFProcessor()
        elif ext == '.pptx':
            return PPTXProcessor()
        else:
            logger.warning(f"Unsupported file format: {ext}")
            return None

## Step 4: Vector Store Implementation
This step implements the `ChromaVectorStore` class for managing the vector database. It handles the storage and retrieval of document embeddings using ChromaDB.

In [89]:

class ChromaVectorStore:
    """ChromaDB vector store implementation for document storage and retrieval"""
    
    def __init__(self, 
                 persist_directory: str = "chroma_db",
                 embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.persist_directory = persist_directory
        self.collection_name = "lecture_notes"
        
        # Using sentence-transformers for embeddings
        self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )
        
        self._init_client()
    
    def _init_client(self) -> None:
        """Initialize the ChromaDB client"""
        os.makedirs(self.persist_directory, exist_ok=True)
        
        try:
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                embedding_function=self.embedding_function,
                metadata={"description": "CTSE Lecture Notes"}
            )
            logging.info(f"ChromaDB initialized at {self.persist_directory}")
        except Exception as e:
            logging.error(f"Failed to initialize ChromaDB: {str(e)}")
            raise
    
    def add_documents(self, documents: List[Dict[str, Any]]) -> None:
        """Add documents to vector store
        
        Args:
            documents: List of document dicts with 'content' and 'metadata' keys
        """
        try:
            # Extract required fields
            ids = [f"doc_{i}" for i in range(len(documents))]
            texts = [doc['content'] for doc in documents]
            metadatas = [doc['metadata'] for doc in documents]
            
            # Add documents to collection
            self.collection.add(
                documents=texts,
                metadatas=metadatas,
                ids=ids
            )
            logging.info(f"Added {len(documents)} documents to ChromaDB")
        except Exception as e:
            logging.error(f"Failed to add documents to ChromaDB: {str(e)}")
    
    def query(self, query_text: str, n_results: int = 5) -> List[Dict[str, Any]]:
        """Query the vector store for similar documents
        
        Args:
            query_text: The query string
            n_results: Number of results to return
            
        Returns:
            List of document dicts with content and metadata
        """
        try:
            results = self.collection.query(
                query_texts=[query_text],
                n_results=n_results
            )
            
            # Format results
            documents = []
            for i, doc in enumerate(results['documents'][0]):
                documents.append({
                    'content': doc,
                    'metadata': results['metadatas'][0][i],
                    'id': results['ids'][0][i],
                    'distance': results.get('distances', [[0] * n_results])[0][i]
                })
            
            return documents
        except Exception as e:
            logging.error(f"Query failed: {str(e)}")
            return []
    
    def get_collection_stats(self) -> Dict[str, Any]:
        """Get stats about the collection"""
        try:
            count = self.collection.count()
            return {
                "collection_name": self.collection_name,
                "document_count": count,
                "persist_directory": self.persist_directory
            }
        except Exception as e:
            logging.error(f"Failed to get collection stats: {str(e)}")
            return {"error": str(e)}

## Step 5: Retrieval Layer
This step implements the `HaystackRetriever` class for hybrid retrieval. It combines semantic search using the vector store with keyword-based search using BM25.

In [90]:

class HaystackRetriever:
    """Haystack-based document retriever for RAG"""
    
    def __init__(self, vector_store=None):
        """Initialize Haystack retriever
        
        Args:
            vector_store: Optional ChromaVectorStore to use for retrieval
        """
        self.vector_store = vector_store
        
        # In-memory document store for BM25
        self.document_store = InMemoryDocumentStore()
        self.bm25_retriever = InMemoryBM25Retriever(document_store=self.document_store)
    
    def add_documents(self, documents: List[Dict[str, Any]]) -> None:
        """Add documents to the retriever
        
        Args:
            documents: List of document dicts with 'content' and 'metadata'
        """
        haystack_docs = []
        
        for doc in documents:
            haystack_docs.append(
                Document(content=doc['content'], meta=doc['metadata'])
            )
        
        try:
            # Add to document store
            self.document_store.write_documents(haystack_docs)
            logging.info(f"Added {len(haystack_docs)} documents to Haystack document store")
        except Exception as e:
            logging.error(f"Failed to add documents to Haystack: {str(e)}")
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Document]:
        """Retrieve relevant documents using BM25
        
        Args:
            query: Query string
            top_k: Number of documents to retrieve
            
        Returns:
            List of Haystack Document objects
        """
        try:
            results = self.bm25_retriever.run(query=query, top_k=top_k)
            documents = results["documents"]
            logging.info(f"Retrieved {len(documents)} documents using BM25 retriever")
            return documents
        except Exception as e:
            logging.error(f"Haystack retrieval error: {str(e)}")
            return []
    
    def hybrid_retrieve(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """Hybrid retrieval using both BM25 and vector search
        
        Args:
            query: Query string
            top_k: Number of documents to retrieve
            
        Returns:
            List of document dicts
        """
        if self.vector_store is None:
            logging.warning("Vector store not available for hybrid search")
            return self._convert_to_dicts(self.retrieve(query, top_k))
        
        try:
            # Get sparse results (BM25)
            sparse_results = self.retrieve(query, top_k)
            sparse_docs = self._convert_to_dicts(sparse_results)
            
            # Get dense results (Vector)
            dense_docs = self.vector_store.query(query, top_k)
            
            # Combine results (simple approach - could be improved)
            seen_contents = set()
            hybrid_results = []
            
            # Add dense results first (prioritize semantic search)
            for doc in dense_docs:
                content_hash = hash(doc['content'])
                if content_hash not in seen_contents:
                    seen_contents.add(content_hash)
                    doc['retrieval_method'] = 'vector'
                    hybrid_results.append(doc)
            
            # Add sparse results that aren't duplicates
            for doc in sparse_docs:
                content_hash = hash(doc['content'])
                if content_hash not in seen_contents:
                    seen_contents.add(content_hash)
                    doc['retrieval_method'] = 'bm25'
                    hybrid_results.append(doc)
            
            # Trim to top_k
            hybrid_results = hybrid_results[:top_k]
            
            logging.info(f"Retrieved {len(hybrid_results)} documents using hybrid search")
            return hybrid_results
        except Exception as e:
            logging.error(f"Hybrid retrieval error: {str(e)}")
            return self._convert_to_dicts(self.retrieve(query, top_k))
    
    @staticmethod
    def _convert_to_dicts(documents: List[Document]) -> List[Dict[str, Any]]:
        """Convert Haystack Document objects to dicts"""
        return [
            {
                'content': doc.content,
                'metadata': doc.meta,
                'id': doc.id,
                'score': doc.score if hasattr(doc, 'score') else None
            }
            for doc in documents
        ]

## Step 6: LLM Interface
This step implements the `OllamaLLM` class for interacting with the local LLM. It handles generating responses using the Ollama API.

In [91]:
class OllamaLLM:
    """Client for Ollama local LLM"""
    
    def __init__(self, 
                 model: str = "mistral:7b-instruct-v0.3-q4_1",
                 base_url: str = "http://localhost:11434",
                 system_prompt: Optional[str] = None):
        """Initialize Ollama client
        
        Args:
            model: Model name to use
            base_url: Ollama API base URL
            system_prompt: Optional system prompt to set context
        """
        self.model = model
        self.base_url = base_url.rstrip('/')
        self.api_url = f"{self.base_url}/api/generate"
        self.system_prompt = system_prompt or SYSTEM_PROMPT
        
        # Check if Ollama is available
        self._check_availability()
    
    def _check_availability(self) -> None:
        """Check if Ollama server is running"""
        try:
            response = requests.get(f"{self.base_url}/api/tags")
            if response.status_code == 200:
                models = response.json().get("models", [])
                model_names = [m.get("name") for m in models]
                
                if not model_names:
                    logging.warning("No models found in Ollama.")
                elif self.model not in model_names:
                    logging.warning(f"Model {self.model} not found. Available models: {', '.join(model_names)}")
                    logging.info(f"You can pull it using: ollama pull {self.model}")
                else:
                    logging.info(f"Connected to Ollama. Using model: {self.model}")
            else:
                logging.warning(f"Ollama returned status code {response.status_code}")
        except requests.exceptions.ConnectionError:
            logging.error(f"Cannot connect to Ollama at {self.base_url}. Is Ollama running?")
        except Exception as e:
            logging.error(f"Error checking Ollama availability: {str(e)}")
    
    def generate(self, prompt: str, context: Optional[List[Dict[str, Any]]] = None) -> str:
        """Generate text from the LLM
        
        Args:
            prompt: User prompt/question
            context: Optional list of context documents
            
        Returns:
            Generated text response
        """
        try:
            if not context:
                # Simple query without RAG context
                return self._simple_generate(prompt)
            
            # Generate RAG prompt
            rag_prompt = RAG_PROMPT_TEMPLATE.format(
                context="\n\n".join([doc['content'] for doc in context]),
                question=prompt
            )
            
            payload = {
                "model": self.model,
                "prompt": rag_prompt,
                "stream": False,
                "system": self.system_prompt
            }
            
            response = requests.post(self.api_url, json=payload)
            
            if response.status_code == 200:
                return response.json()["response"]
            else:
                logging.error(f"Ollama API error: {response.status_code} - {response.text}")
                return f"Error: Failed to generate response from Ollama (Status {response.status_code})"
        except requests.exceptions.ConnectionError:
            logging.error("Connection to Ollama failed. Is Ollama running?")
            return (
                "# Connection Error\n\n"
                "Cannot connect to Ollama. Please make sure the Ollama server is running.\n\n"
                "To start Ollama:\n"
                "1. Open a new terminal\n"
                "2. Run the Ollama application\n"
                "3. Try your question again"
            )
        except Exception as e:
            logging.error(f"Error generating from Ollama: {str(e)}")
            return f"# Error\n\n{str(e)}\n\nPlease try again or check the logs for more information."
    
    def _simple_generate(self, prompt: str) -> str:
        """Generate response for a simple prompt without RAG context
        
        Args:
            prompt: User prompt/question
            
        Returns:
            Generated text response
        """
        payload = {
            "model": self.model,
            "prompt": prompt,
            "stream": False,
            "system": self.system_prompt
        }
        
        response = requests.post(self.api_url, json=payload)
        
        if response.status_code == 200:
            return response.json()["response"]
        else:
            logging.error(f"Ollama API error: {response.status_code} - {response.text}")
            return f"Error: Failed to generate response from Ollama (Status {response.status_code})"
    
    def list_models(self) -> List[str]:
        """List available models in Ollama"""
        try:
            response = requests.get(f"{self.base_url}/api/tags")
            if response.status_code == 200:
                return [model.get("name") for model in response.json().get("models", [])]
            else:
                logging.error(f"Failed to list models: {response.status_code}")
                return []
        except Exception as e:
            logging.error(f"Error listing models: {str(e)}")
            return []

## Step 7: Prompt Engineering
This step implements the prompt engineering templates and helper functions for generating prompts. These templates ensure structured and contextually relevant responses.

In [92]:

def get_rag_prompt(question: str, context: List[Dict[str, Any]]) -> str:
    """Generate an advanced RAG prompt with chain-of-thought reasoning
    
    Args:
        question: The user's question
        context: List of context documents
    
    Returns:
        Formatted prompt for the LLM
    """
    # Format context documents into a unified text
    context_text = format_context_documents(context)
    
    # Replace placeholders in the template
    prompt = RAG_PROMPT_TEMPLATE.format(
        context=context_text,
        question=question
    )
    
    return prompt

def format_context_documents(context: List[Dict[str, Any]]) -> str:
    """Format context documents with metadata for better reference
    
    Args:
        context: List of context documents
    
    Returns:
        Formatted context text
    """
    if not context:
        return "No relevant context information available."
    
    context_sections = []
    
    for i, doc in enumerate(context):
        # Extract metadata
        source = doc.get('metadata', {}).get('source', 'Unknown Source')
        page_num = doc.get('metadata', {}).get('page_num', 'N/A')
        content = doc.get('content', '').strip()
        
        # Calculate relevance if available
        relevance = ""
        if 'distance' in doc and doc['distance'] is not None:
            similarity = 1.0 - float(doc['distance'])
            relevance = f" [Relevance: {similarity:.2f}]"
        
        # Format document section
        section = f"[DOCUMENT {i+1}]: {source} (Page/Slide: {page_num}){relevance}\n{content}"
        context_sections.append(section)
    
    return "\n\n" + "\n\n".join(context_sections)

def get_structured_prompt(question: str, context: List[Dict[str, Any]], 
                         output_format: str = "default") -> str:
    """Generate a prompt requiring structured output in a specific format
    
    Args:
        question: The user's question
        context: List of context documents
        output_format: The desired output format (default, table, steps, comparison)
    
    Returns:
        Formatted prompt for the LLM
    """
    context_text = format_context_documents(context)
    
    # Define different output format instructions
    format_instructions = {
        "default": """
Format your response using appropriate Markdown with:
- Clear headings with ## and ### for sections
- Bullet points for lists
- **Bold** for important concepts
- `code blocks` for technical terms or code
- > blockquotes for definitions
        """,
        
        "table": """
Include a Markdown table in your response to summarize key points:
| Concept | Description | Example |
| ------- | ----------- | ------- |
| Concept 1 | Description 1 | Example 1 |
| ... | ... | ... |
        """,
        
        "steps": """
Format your response as a step-by-step guide using:
## Process Overview
Brief overview of the process

## Step 1: [Step Name]
Explanation of step 1

## Step 2: [Step Name]
Explanation of step 2

And so on, with clear numbered steps and explanations.
        """,
        
        "comparison": """
Format your response as a comparison between concepts:
## Concept A
- Key characteristics
- Advantages
- Disadvantages

## Concept B
- Key characteristics
- Advantages
- Disadvantages

## Comparison
| Aspect | Concept A | Concept B |
| ------ | --------- | --------- |
| Aspect 1 | Value for A | Value for B |
| ... | ... | ... |
        """
    }
    
    # Get the format instructions
    format_instruction = format_instructions.get(output_format, format_instructions["default"])
    
    # Create the structured prompt
    structured_prompt = f"""
You are a university-level educational assistant specialized in Current Trends in Software Engineering.

Based on the following context information from CTSE lecture notes:
{context_text}

Please answer this question:
{question}

{format_instruction}

Be concise but comprehensive, and ensure all information is accurate according to the provided context.
    """
    
    return structured_prompt

def _detect_appropriate_format(question: str) -> str:
    """Detect the most appropriate output format based on the question
    
    Args:
        question: The user question
    
    Returns:
        Suggested output format
    """
    question_lower = question.lower()
    
    # Check for step-by-step requests
    if any(phrase in question_lower for phrase in ["steps", "process", "how to", "procedure", "workflow"]):
        return "steps"
    
    # Check for comparison requests
    if any(phrase in question_lower for phrase in ["compare", "difference between", "versus", "vs", "pros and cons"]):
        return "comparison"
    
    # Check for listing/table appropriate requests
    if any(phrase in question_lower for phrase in ["list", "summarize", "overview", "key aspects", "characteristics"]):
        return "table"
    
    # Default format for other questions
    return "default"

## Step 8: Terminal UI
This step implements the `TerminalUI` class for the Rich-powered terminal interface. It provides an interactive and visually appealing user interface for the chatbot.

In [93]:

class TerminalUI:
    """Terminal UI using Rich for the chatbot interface"""
    
    def __init__(self, 
                 generate_fn: Callable[[str], str],
                 history_capacity: int = 10,
                 system_info: Dict[str, Any] = None,
                 model_switch_fn: Optional[Callable[[str], bool]] = None):
        """Initialize the terminal UI
        
        Args:
            generate_fn: Function that takes a question and returns an answer
            history_capacity: Maximum number of conversations to keep in history
            system_info: Dictionary containing system information to display
            model_switch_fn: Function to switch between different models
        """
        self.console = Console()
        self.generate_fn = generate_fn
        self.history = []
        self.history_capacity = history_capacity
        self.system_info = system_info or {}
        self.model_switch_fn = model_switch_fn
        
        # Theme colors
        self.theme = {
            "accent": "blue",
            "secondary": "cyan",
            "success": "green",
            "warning": "yellow",
            "error": "red",
            "muted": "dim white"
        }
        
        # Clear screen on start
        os.system('cls' if os.name == 'nt' else 'clear')
    
    def show_splash_screen(self):
        """Show an animated splash screen"""
        with Progress(
            SpinnerColumn(),
            TextColumn("[bold blue]Initializing RagChitChat...[/]"),
            BarColumn(bar_width=40),
            TaskProgressColumn(),
            transient=True
        ) as progress:
            task = progress.add_task("[cyan]Loading components...", total=100)
            
            for i in range(101):
                time.sleep(0.015)  # Adjust for faster/slower animation
                progress.update(task, completed=i, 
                               description=f"[cyan]{self._get_loading_message(i)}[/]")
        
        self._display_logo()
    
    def _get_loading_message(self, progress):
        """Return different messages based on loading progress"""
        if progress < 20:
            return "Initializing components..."
        elif progress < 40:
            return "Loading document store..."
        elif progress < 60:
            return "Warming up embeddings..."
        elif progress < 80:
            return "Connecting to Ollama..."
        else:
            return "Almost ready..."
    
    def _display_logo(self):
        """Display the app logo"""
        logo = """
  _____              _____ _     _ _    _____ _           _   
 |  __ \\            / ____| |   (_) |  / ____| |         | |  
 | |__) |__ _  __ _| |    | |__  _| |_| |    | |__   __ _| |_ 
 |  _  // _` |/ _` | |    | '_ \\| | __| |    | '_ \\ / _` | __|
 | | \\ \\ (_| | (_| | |____| | | | | |_| |____| | | | (_| | |_ 
 |_|  \\_\\__,_|\\__, |\\_____|_| |_|_|\\__|\\_____|_| |_|\\__,_|\\__|
               __/ |                                          
              |___/                                           
        """
        
        self.console.print(Panel(f"[bold {self.theme['accent']}]{logo}[/]", 
                               border_style=self.theme["accent"],
                               subtitle="[white]Your AI assistant for CTSE Lecture Notes[/]"))
    
    def show_welcome(self):
        """Display welcome message and instructions"""
        self.show_splash_screen()
        
        # Show system info
        if self.system_info:
            self.show_system_info()
        
        self.console.print()
        self.console.print(Panel(
            "[bold]Ask me anything about Current Trends in Software Engineering![/]\n\n"
            "I can answer questions about CTSE lecture content, explain concepts,\n"
            "and help you understand course materials better.",
            title=f"[{self.theme['secondary']}]How to use RagChitChat[/]",
            border_style=self.theme["secondary"],
            expand=False
        ))
        
        self.console.print(Align(
            "\nType [bold cyan]/help[/bold cyan] for commands or [bold cyan]/exit[/bold cyan] to quit.\n", 
            align="center"
        ))
    
    def show_system_info(self):
        """Display system information"""
        table = Table(box=box.ROUNDED, title="System Information", border_style=self.theme["secondary"])
        
        table.add_column("Component", style=f"bold {self.theme['secondary']}")
        table.add_column("Details", style="white")
        
        # Add system info rows
        for key, value in self.system_info.items():
            # Format keys and values nicely
            key_display = key.replace('_', ' ').title()
            if isinstance(value, (list, tuple)):
                value_display = ', '.join(str(v) for v in value)
            elif isinstance(value, dict):
                value_display = ', '.join(f"{k}: {v}" for k, v in value.items())
            else:
                value_display = str(value)
                
            table.add_row(key_display, value_display)
            
        self.console.print(Align(table, align="center"))
    
    def show_help(self):
        """Display help information"""
        table = Table(box=box.ROUNDED, title="Available Commands", border_style=self.theme["accent"])
        
        table.add_column("Command", style=f"bold {self.theme['secondary']}")
        table.add_column("Description")
        
        table.add_row("/help", "Show this help message")
        table.add_row("/exit", "Exit the chatbot")
        table.add_row("/clear", "Clear the conversation history")
        table.add_row("/history", "Show conversation history")
        table.add_row("/info", "Show system information")
        table.add_row("/about", "About this application")
        table.add_row("/models", "List available models")
        table.add_row("/model <name>", "Switch to a different model")
        
        self.console.print(Align(table, align="center"))
        
        # Show example questions
        examples = Table(
            box=box.SIMPLE,
            title="Example Questions",
            show_header=False,
            border_style=self.theme["muted"]
        )
        
        examples.add_column("", style=self.theme["secondary"])
        
        examples.add_row("- What is continuous integration?")
        examples.add_row("- Explain the difference between DevOps and DevSecOps")
        examples.add_row("- What are the benefits of microservices architecture?")
        examples.add_row("- How does containerization improve software deployment?")
        
        self.console.print(Panel(examples, border_style=self.theme["muted"], expand=False))
    
    def show_about(self):
        """Display information about the application"""
        about_text = """
RagChitChat is a Retrieval-Augmented Generation (RAG) chatbot designed to help students
learn about Current Trends in Software Engineering. It uses:

• [bold]Local LLM[/bold] via [link=https://ollama.ai]Ollama[/link] for AI inference
• [bold]RAG Pipeline[/bold] with Haystack for intelligent document retrieval
• [bold]Vector Database[/bold] using ChromaDB for semantic search
• [bold]Rich UI[/bold] in the terminal for a pleasant user experience

This project demonstrates how generative AI can be applied to educational contexts
while maintaining privacy by keeping all operations local.

[dim italic]Created by Nadun for the CTSE module assignment[/dim italic]
        """
        
        self.console.print(Panel(
            Markdown(about_text),
            title="About RagChitChat",
            border_style=self.theme["accent"],
            padding=(1, 2)
        ))
    
    def show_history(self, page: int = 1, items_per_page: int = 5):
        """Display conversation history with pagination
        
        Args:
            page: Page number to display
            items_per_page: Number of conversations per page
        """
        if not self.history:
            self.console.print(Panel(
                "[italic]You haven't asked any questions yet.[/]", 
                border_style=self.theme["warning"],
                title="History Empty"
            ))
            return
        
        # Calculate pagination
        start_idx = (page - 1) * items_per_page
        end_idx = start_idx + items_per_page
        current_items = self.history[start_idx:end_idx]
        total_pages = (len(self.history) + items_per_page - 1) // items_per_page
        
        # Create paginated display
        self.console.print(Rule(f"[bold]Conversation History (Page {page}/{total_pages})[/]"))
        
        for i, (question, answer) in enumerate(current_items, start=start_idx+1):
            self.console.print()
            self.console.print(Panel(
                f"[bold {self.theme['secondary']}]Q: {question}[/]",
                border_style=self.theme["secondary"],
                padding=(0, 1)
            ))
            
            try:
                # Try to render as markdown
                self.console.print(Panel(
                    Markdown(answer),
                    border_style=self.theme["success"],
                    padding=(0, 1)
                ))
            except Exception:
                # Fallback to plain text
                self.console.print(Panel(
                    answer,
                    border_style=self.theme["success"],
                    padding=(0, 1)
                ))
        
        # Pagination controls
        self.console.print()
        if total_pages > 1:
            pagination = ""
            if page > 1:
                pagination += "[bold]/prev[/] "
            pagination += f"Page {page}/{total_pages}"
            if page < total_pages:
                pagination += " [bold]/next[/]"
                
            self.console.print(Align(pagination, align="center"))
    
    def handle_question(self, question: str) -> None:
        """Process a user question and display the response
        
        Args:
            question: User's question text
        """
        # Display the question banner
        self.console.print()
        self.console.print(Panel(
            f"[bold]{question}[/]", 
            border_style=self.theme["accent"],
            title="Question"
        ))
        
        # Show spinner while generating response
        with Progress(
            SpinnerColumn(),
            TextColumn("Thinking..."),
            BarColumn(),
            TextColumn("[bold cyan]{task.description}[/bold cyan]"),
            expand=True
        ) as progress:
            retrieval_task = progress.add_task("Retrieving context...", total=None)
            
            # Use closure to update progress
            def progress_callback(stage, details=None):
                if stage == "retrieval_complete":
                    progress.update(retrieval_task, description="Generating answer...")
            
            # Record time for response metrics
            start_time = time.time()
            
            # Generate response
            answer = self.generate_fn(question, progress_callback)
            
            # Calculate response time
            elapsed = time.time() - start_time
        
        # Store in history
        self.history.append((question, answer))
        if len(self.history) > self.history_capacity:
            self.history.pop(0)
        
        # Display the response
        self.console.print()
        self.console.print(f"[{self.theme['muted']}]Generated in {elapsed:.2f}s[/]")
        
        try:
            # Try to render as markdown
            self.console.print(Panel(
                Markdown(answer),
                border_style=self.theme["success"],
                title="Answer",
                padding=(1, 2)
            ))
        except Exception:
            # Fallback to plain text
            self.console.print(Panel(
                answer,
                border_style=self.theme["success"],
                title="Answer",
                padding=(1, 2)
            ))
    
    def list_available_models(self):
        """Display available models that can be selected"""
        if not self.model_switch_fn or not self.system_info or 'available_models' not in self.system_info:
            self.console.print(Panel(
                "[italic]Model switching is not available.[/]", 
                border_style=self.theme["warning"],
                title="Models"
            ))
            return
            
        # Get current model and available models
        current_model = self.system_info.get('current_model', 'unknown')
        models = self.system_info.get('available_models', [])
        
        if not models:
            self.console.print(Panel(
                "[italic]No models found in Ollama. You can pull models using the Ollama CLI:[/]\n" +
                "ollama pull mistral:7b-instruct-v0.3-q4_1", 
                border_style=self.theme["warning"],
                title="No Models Available"
            ))
            return
            
        # Create a table of models
        table = Table(title="Available Models", box=box.ROUNDED, border_style=self.theme["secondary"])
        table.add_column("Model", style=f"bold {self.theme['secondary']}")
        table.add_column("Status")
        
        for model in models:
            if model == current_model:
                table.add_row(model, f"[{self.theme['success']}]ACTIVE[/]")
            else:
                table.add_row(model, "")
                
        self.console.print(Panel(
            table,
            border_style=self.theme["secondary"],
            title="Available Models"
        ))
        
        self.console.print("\nTo switch models, use: [bold]/model model_name[/]")
        
    def switch_model(self, model_name: str):
        """Switch to a different model
        
        Args:
            model_name: Name of the model to switch to
        """
        if not self.model_switch_fn:
            self.console.print(Panel(
                "[italic]Model switching is not available.[/]", 
                border_style=self.theme["warning"],
                title="Models"
            ))
            return
            
        # Show loading indicator while switching model
        with Progress(
            SpinnerColumn(),
            TextColumn(f"[cyan]Switching to model {model_name}...[/cyan]"),
            transient=True
        ) as progress:
            task = progress.add_task("", total=None)
            
            # Try to switch the model
            success = self.model_switch_fn(model_name)
        
        # Show the result
        if success:
            # Update the current model in system info
            if self.system_info and 'current_model' in self.system_info:
                self.system_info['current_model'] = model_name
                
            self.console.print(Panel(
                f"[bold]Successfully switched to model: [green]{model_name}[/green][/]",
                border_style=self.theme["success"],
                title="Model Switched"
            ))
        else:
            self.console.print(Panel(
                f"[bold]Failed to switch to model: [red]{model_name}[/red][/]\n\n"
                f"Make sure the model is available in Ollama.\n"
                f"You can pull it using: ollama pull {model_name}",
                border_style=self.theme["error"],
                title="Error"
            ))
    
    def run(self) -> None:
        """Run the main chat loop"""
        self.show_welcome()
        
        # Main interaction loop
        current_history_page = 1
        while True:
            self.console.print()
            question = Prompt.ask(f"[bold {self.theme['accent']}]Ask a question (or type /help)[/]")
            
            # Handle commands
            if question.lower() == "/exit":
                if Confirm.ask("[yellow]Are you sure you want to exit?[/]"):
                    self.console.print("[yellow]Thank you for using RagChitChat! Goodbye![/]")
                    break
            elif question.lower() == "/help":
                self.show_help()
            elif question.lower() == "/clear":
                self.history = []
                os.system('cls' if os.name == 'nt' else 'clear')
                self.show_welcome()
            elif question.lower() == "/history":
                current_history_page = 1
                self.show_history(page=current_history_page)
            elif question.lower() == "/next" and self.history:
                current_history_page += 1
                self.show_history(page=current_history_page)
            elif question.lower() == "/prev" and self.history:
                current_history_page = max(1, current_history_page - 1)
                self.show_history(page=current_history_page)
            elif question.lower() == "/info":
                self.show_system_info()
            elif question.lower() == "/about":
                self.show_about()
            elif question.lower() == "/models":
                self.list_available_models()
            elif question.lower().startswith("/model "):
                model_name = question[7:].strip()
                if model_name:
                    self.switch_model(model_name)
                else:
                    self.console.print("[yellow]Please specify a model name. Example: /model mistral-7b[/]")
            elif question.strip():
                # Process regular questions
                self.handle_question(question)


## Step 9: Main Application Logic
This step implements the main application logic (`RagChitChat` class) to integrate all components and run the chatbot. This class serves as the entry point for the application.

In [94]:

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class RagChitChat:
    """Main application class"""
    
    def __init__(self, data_dir: str = "data", processed_dir: str = "processed", db_dir: str = "chroma_db", model_name: str = "mistral:7b-instruct-v0.3-q4_1"):
        """Initialize the RAG chatbot"""
        self.data_dir = data_dir
        self.processed_dir = processed_dir
        self.db_dir = db_dir
        self.model_name = model_name
        
        # Initialize components
        self.vector_store = ChromaVectorStore(persist_directory=db_dir)
        self.retriever = HaystackRetriever(vector_store=self.vector_store)
        self.llm = OllamaLLM(model=model_name)
        
        # Ensure data is processed
        self._ensure_data_processed()
    
    def _ensure_data_processed(self) -> None:
        """Process lecture notes if needed"""
        os.makedirs(self.processed_dir, exist_ok=True)
        data_files = list(Path(self.data_dir).glob("**/*.*"))
        processed_files = list(Path(self.processed_dir).glob("*.txt"))
        
        if not processed_files or len(data_files) > len(processed_files):
            logger.info("Processing lecture notes...")
            self._process_lecture_notes()
        else:
            logger.info("Using existing processed lecture notes")
    
    def _process_lecture_notes(self) -> None:
        """Process lecture notes from data directory"""
        data_files = list(Path(self.data_dir).glob("**/*.*"))
        all_documents = []
        
        for file_path in data_files:
            if file_path.suffix.lower() not in ['.pdf', '.pptx']:
                continue
            
            processor = DocumentProcessorFactory.get_processor(str(file_path))
            if processor:
                chunks = processor.process(str(file_path))
                output_filename = f"{file_path.stem}.txt"
                processor.save_chunks(chunks, self.processed_dir, output_filename)
                all_documents.extend(chunks)
        
        if all_documents:
            logger.info(f"Adding {len(all_documents)} document chunks to vector store")
            self.vector_store.add_documents(all_documents)
            self.retriever.add_documents(all_documents)
    
    def generate_response(self, question: str, progress_callback: Optional[Callable] = None) -> str:
        """Generate response for a question"""
        context_docs = self.retriever.hybrid_retrieve(question, top_k=5)
        if progress_callback:
            progress_callback("retrieval_complete", {"num_docs": len(context_docs)})
        return self.llm.generate(question, context_docs)
    
    def switch_model(self, model_name: str) -> bool:
        """Switch to a different Ollama model"""
        try:
            new_llm = OllamaLLM(model=model_name)
            available_models = new_llm.list_models()
            if model_name not in available_models:
                logger.warning(f"Model {model_name} not available in Ollama")
                return False
            self.llm = new_llm
            self.model_name = model_name
            logger.info(f"Successfully switched to model: {model_name}")
            return True
        except Exception as e:
            logger.error(f"Error switching model: {str(e)}")
            return False
    
    def get_system_info(self) -> Dict[str, Any]:
        """Get system information for the UI"""
        vector_store_stats = self.vector_store.get_collection_stats()
        available_models = self.llm.list_models()
        processed_files = [f.name for f in Path(self.processed_dir).glob("*.txt")]
        return {
            "current_model": self.model_name,
            "available_models": available_models,
            "document_count": vector_store_stats.get("document_count", 0),
            "processed_files": processed_files[:5] + (["..."] if len(processed_files) > 5 else []),
            "vector_db": "ChromaDB"
        }
    
    def run(self) -> None:
        """Run the chatbot interface"""
        system_info = self.get_system_info()
        ui = TerminalUI(
            generate_fn=self.generate_response,
            system_info=system_info,
            model_switch_fn=self.switch_model
        )
        ui.run()

if __name__ == "__main__":
    try:
        chatbot = RagChitChat()
        chatbot.run()
    except KeyboardInterrupt:
        print("\nExiting RagChitChat...")
    except Exception as e:
        logger.error(f"Error running RagChitChat: {str(e)}", exc_info=True)

2025-05-08 17:27:50,874 - root - INFO - ChromaDB initialized at chroma_db
2025-05-08 17:27:52,908 - root - INFO - Connected to Ollama. Using model: mistral:7b-instruct-v0.3-q4_1
2025-05-08 17:27:52,909 - __main__ - INFO - Using existing processed lecture notes
2025-05-08 17:27:52,908 - root - INFO - Connected to Ollama. Using model: mistral:7b-instruct-v0.3-q4_1
2025-05-08 17:27:52,909 - __main__ - INFO - Using existing processed lecture notes


Output()

Output()

2025-05-08 17:28:08,792 - haystack.document_stores.in_memory.document_store - INFO - No documents found for BM25 retrieval. Returning empty list.
2025-05-08 17:28:08,793 - root - INFO - Retrieved 0 documents using BM25 retriever
2025-05-08 17:28:08,793 - root - INFO - Retrieved 0 documents using BM25 retriever


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

2025-05-08 17:28:08,815 - root - INFO - Retrieved 5 documents using hybrid search
