In [1]:
!pip install openai pinecone-client==3.0.0



In [2]:
!pip install openai pinecone-client==3.0.0 langchain tiktoken numpy pandas
!pip install sentence-transformers faiss-cpu python-dotenv


Collecting faiss-cpu
  Downloading faiss_cpu-1.11.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.8 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_6

In [6]:
import openai
import logging
try:
    from pinecone import Pinecone, ServerlessSpec
    PINECONE_AVAILABLE = True
except ImportError:
    try:
        import pinecone
        PINECONE_AVAILABLE = True
        print("Using legacy Pinecone import")
    except ImportError:
        PINECONE_AVAILABLE = False
        print("Pinecone not available - using FAISS as fallback")
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from sentence_transformers import SentenceTransformer


import faiss


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [26]:
from dataclasses import dataclass

@dataclass
class RAGConfig:
    """Configuration class for RAG system"""
    openai_api_key: str = ""
    pinecone_api_key: str = ""
    pinecone_environment: str = "us-east-1"
    index_name: str = "qa-bot-index"
    embedding_model: str = "text-embedding-3-large"
    chat_model: str = "gpt-4-turbo-preview"
    chunk_size: int = 1000
    chunk_overlap: int = 200
    max_tokens: int = 150000
    temperature: float = 0.1
    top_k: int = 5
    similarity_threshold: float = 0.7

#initialize configuration
config = RAGConfig()
def setup_api_keys():
    """Setup API keys from environment or user input"""
    try:
        #trying to get from environment first
        config.openai_api_key = os.getenv('OPENAI_API_KEY', '')
        config.pinecone_api_key = os.getenv('PINECONE_API_KEY', '')

        #instructions will be provided if not found. i have not included my personal api and pinecone keys for security reasons
        if not config.openai_api_key:
            print("openAI API key not found!")
            print("To use OpenAI embeddings and chat completion:")
            print("1. Get your API key from https://platform.openai.com/api-keys")
            print("2. Set it as: config.openai_api_key = 'your-api-key-here'")
            print("3. Or set environment variable: OPENAI_API_KEY")
            print("\nFor demo purposes, we'll use a local embedding model.")
            config.openai_api_key = "demo-mode"

        if not config.pinecone_api_key:
            print("pinecone API key not found!")
            print("To use Pinecone vector database:")
            print("1. Get your API key from https://app.pinecone.io/")
            print("2. Set it as: config.pinecone_api_key = 'your-api-key-here'")
            print("3. Or set environment variable: PINECONE_API_KEY")
            print("\nFor demo purposes, we'll use FAISS as local vector database.")
            config.pinecone_api_key = "demo-mode"

        return True
    except Exception as e:
        logger.error(f"Error setting up API keys: {e}")
        return False

In [27]:
from typing import List
from datetime import datetime
import tiktoken
import os

class DocumentProcessor:
    """Advanced document processing with multiple strategies"""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=config.chunk_size,
            chunk_overlap=config.chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ". ", " ", ""]
        )
        self.encoding = tiktoken.encoding_for_model("gpt-4")

    def process_documents(self, documents: List[str]) -> List[Document]:
        """Process and chunk documents with metadata"""
        processed_docs = []

        for i, doc_content in enumerate(documents):
            #here i am creating the document chunks
            chunks = self.text_splitter.split_text(doc_content)

            for j, chunk in enumerate(chunks):
                #adding the metadata here
                metadata = {
                    "doc_id": i,
                    "chunk_id": j,
                    "chunk_size": len(chunk),
                    "token_count": len(self.encoding.encode(chunk)),
                    "timestamp": datetime.now().isoformat()
                }

                processed_docs.append(Document(
                    page_content=chunk,
                    metadata=metadata
                ))

        return processed_docs

    def extract_keywords(self, text: str) -> List[str]:
        """Extract keywords from text for enhanced retrieval"""
        #simple keyword extraction (can be enhanced with NLP libraries)
        words = text.lower().split()
        #filter out common stop words
        stop_words = {'the', 'is', 'at', 'which', 'on', 'and', 'a', 'to', 'are', 'as', 'was', 'for', 'with', 'by'}
        keywords = [word for word in words if word not in stop_words and len(word) > 3]
        return list(set(keywords))

In [28]:
class EmbeddingGenerator:
    """Generate embeddings using OpenAI API with fallback options"""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.client = openai.OpenAI(api_key=config.openai_api_key)
        #fallback embedding model
        self.local_model = None

    def initialize_local_model(self):
        """Initialize local embedding model as fallback"""
        try:
            self.local_model = SentenceTransformer('all-MiniLM-L6-v2')
            logger.info("Local embedding model initialized")
        except Exception as e:
            logger.error(f"Failed to initialize local model: {e}")

    def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings with retry logic"""
        try:
            #primary: OpenAI embeddings
            return self._generate_openai_embeddings(texts)
        except Exception as e:
            logger.warning(f"OpenAI embedding failed: {e}")
            #fallback: Local model
            return self._generate_local_embeddings(texts)

    def _generate_openai_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings using OpenAI API"""
        if self.config.openai_api_key == "demo-mode":
            #using local model instead
            return self._generate_local_embeddings(texts)

        embeddings = []

        #processing in batches to handle rate limits
        batch_size = 50
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]

            response = self.client.embeddings.create(
                model=self.config.embedding_model,
                input=batch
            )

            batch_embeddings = [item.embedding for item in response.data]
            embeddings.extend(batch_embeddings)

        return embeddings

    def _generate_local_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings using local model"""
        if self.local_model is None:
            self.initialize_local_model()

        embeddings = self.local_model.encode(texts)
        #pad to match OpenAI embedding dimension (3072)
        padded_embeddings = []
        for emb in embeddings:
            if len(emb) < 3072:
                #pad with zeros
                padded = np.pad(emb, (0, 3072 - len(emb)), 'constant')
                padded_embeddings.append(padded.tolist())
            else:
                padded_embeddings.append(emb[:3072].tolist())

        return padded_embeddings

In [29]:
from typing import List, Dict
from pinecone import Pinecone, ServerlessSpec
import pinecone
import faiss
import numpy as np


class VectorDatabaseManager:
    """Unified vector database manager with Pinecone and FAISS support"""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.use_pinecone = PINECONE_AVAILABLE and config.pinecone_api_key != "demo-mode"
        self.index = None
        self.faiss_index = None
        self.document_store = {}

        if self.use_pinecone:
            self._initialize_pinecone()
        else:
            self._initialize_faiss()

    def _initialize_pinecone(self):
        """Initialize Pinecone vector database"""
        try:
            #handling different Pinecone versions
            if hasattr(pinecone, 'init'):
                #this is the legacy version
                pinecone.init(
                    api_key=self.config.pinecone_api_key,
                    environment=self.config.pinecone_environment
                )

                #checking if index exists
                if self.config.index_name not in pinecone.list_indexes():
                    pinecone.create_index(
                        name=self.config.index_name,
                        dimension=3072,
                        metric='cosine'
                    )
                    logger.info(f"Created Pinecone index: {self.config.index_name}")

                self.index = pinecone.Index(self.config.index_name)

            else:
                #new version
                self.pc = Pinecone(api_key=self.config.pinecone_api_key)

                if self.config.index_name not in self.pc.list_indexes().names():
                    self.pc.create_index(
                        name=self.config.index_name,
                        dimension=3072,
                        metric='cosine',
                        spec=ServerlessSpec(
                            cloud='aws',
                            region=self.config.pinecone_environment
                        )
                    )
                    logger.info(f"Created Pinecone index: {self.config.index_name}")

                self.index = self.pc.Index(self.config.index_name)

            logger.info("Pinecone initialized successfully")

        except Exception as e:
            logger.error(f"Failed to initialize Pinecone: {e}")
            logger.info("Falling back to FAISS")
            self.use_pinecone = False
            self._initialize_faiss()

    def _initialize_faiss(self):
        """Initialize FAISS vector database as fallback"""
        try:
            #creating FAISS index
            self.faiss_index = faiss.IndexFlatIP(3072)
            logger.info("FAISS index initialized successfully")

        except Exception as e:
            logger.error(f"Failed to initialize FAISS: {e}")
            raise

    def upsert_documents(self, documents: List[Document], embeddings: List[List[float]]):
        """Upsert documents with embeddings"""
        try:
            if self.use_pinecone:
                self._upsert_to_pinecone(documents, embeddings)
            else:
                self._upsert_to_faiss(documents, embeddings)

        except Exception as e:
            logger.error(f"Error upserting documents: {e}")
            raise

    def _upsert_to_pinecone(self, documents: List[Document], embeddings: List[List[float]]):
        """Upsert to Pinecone"""
        vectors = []
        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            vector = {
                "id": f"doc_{i}",
                "values": embedding,
                "metadata": {
                    "text": doc.page_content,
                    **doc.metadata
                }
            }
            vectors.append(vector)

        #upsert in batches
        batch_size = 100
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch)

        logger.info(f"Upserted {len(vectors)} documents to Pinecone")

    def _upsert_to_faiss(self, documents: List[Document], embeddings: List[List[float]]):
        """Upsert to FAISS"""
        #converting embeddings to numpy array and normalize for cosine similarity
        embeddings_array = np.array(embeddings).astype('float32')
        #normalizing vectors for cosine similarity
        faiss.normalize_L2(embeddings_array)

        #adding to FAISS index
        start_id = self.faiss_index.ntotal
        self.faiss_index.add(embeddings_array)

        #storing metadata
        for i, doc in enumerate(documents):
            doc_id = start_id + i
            self.document_store[doc_id] = {
                "text": doc.page_content,
                **doc.metadata
            }

        logger.info(f"Upserted {len(documents)} documents to FAISS")

    def search_similar(self, query_embedding: List[float], top_k: int = 5) -> List[Dict]:
        """Search for similar documents"""
        try:
            if self.use_pinecone:
                return self._search_pinecone(query_embedding, top_k)
            else:
                return self._search_faiss(query_embedding, top_k)

        except Exception as e:
            logger.error(f"Error searching similar documents: {e}")
            return []

    def _search_pinecone(self, query_embedding: List[float], top_k: int) -> List[Dict]:
        """Search in Pinecone"""
        results = self.index.query(
            vector=query_embedding,
            top_k=top_k,
            include_metadata=True
        )

        return [
            {
                "id": match.id,
                "score": match.score,
                "text": match.metadata.get("text", ""),
                "metadata": match.metadata
            }
            for match in results.matches
        ]

    def _search_faiss(self, query_embedding: List[float], top_k: int) -> List[Dict]:
        """Search in FAISS"""
        #normalizing query vector
        query_vector = np.array([query_embedding]).astype('float32')
        faiss.normalize_L2(query_vector)

        #performing the search
        scores, indices = self.faiss_index.search(query_vector, top_k)

        results = []
        for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
            if idx != -1 and idx in self.document_store:  # -1 indicates no result
                results.append({
                    "id": f"doc_{idx}",
                    "score": float(score),
                    "text": self.document_store[idx]["text"],
                    "metadata": self.document_store[idx]
                })

        return results

In [30]:
from typing import List, Dict, Any
from datetime import datetime
import tiktoken
import os
import openai
import logging

class RAGSystem:
    """Advanced RAG system with multiple retrieval strategies"""

    def __init__(self, config: RAGConfig):
        self.config = config
        self.doc_processor = DocumentProcessor(config)
        self.embedding_generator = EmbeddingGenerator(config)
        self.vector_db = VectorDatabaseManager(config)
        self.client = openai.OpenAI(api_key=config.openai_api_key)


        self.initialize_system()

    def initialize_system(self):
        """Initialize all system components"""
        try:
            logger.info("RAG system initialized successfully")
            if self.vector_db.use_pinecone:
                logger.info("Using Pinecone for vector storage")
            else:
                logger.info("Using FAISS for vector storage")
        except Exception as e:
            logger.error(f"Error initializing RAG system: {e}")
            raise

    def ingest_documents(self, documents: List[str]):
        """Ingest documents into the system"""
        try:

            processed_docs = self.doc_processor.process_documents(documents)

            #generating embeddings
            texts = [doc.page_content for doc in processed_docs]
            embeddings = self.embedding_generator.generate_embeddings(texts)

            #storing in vector database
            self.vector_db.upsert_documents(processed_docs, embeddings)

            logger.info(f"Successfully ingested {len(processed_docs)} document chunks")

        except Exception as e:
            logger.error(f"Error ingesting documents: {e}")
            raise

    def retrieve_relevant_context(self, query: str) -> List[Dict]:
        """Retrieve relevant context for a query"""
        try:
            #gnerating query embedding
            query_embedding = self.embedding_generator.generate_embeddings([query])[0]

            #searching similar documents
            results = self.vector_db.search_similar(
                query_embedding,
                top_k=self.config.top_k
            )

            #filtering by similarity threshold
            filtered_results = [
                result for result in results
                if result["score"] >= self.config.similarity_threshold
            ]

            return filtered_results

        except Exception as e:
            logger.error(f"Error retrieving context: {e}")
            return []

    def generate_response(self, query: str, context: List[Dict]) -> str:
        """Generate response using retrieved context"""
        try:
            #checking if we have a real OpenAI API key
            if self.config.openai_api_key == "demo-mode":
                #generate a simple rule-based response for demo
                return self._generate_demo_response(query, context)

            #preparing context text
            context_text = "\n\n".join([
                f"[Document {i+1}] {item['text']}"
                for i, item in enumerate(context)
            ])

            #preparing prompt
            prompt = f"""
            You are an intelligent QA assistant for a business. Use the provided context to answer the question accurately and comprehensively.

            Context:
            {context_text}

            Question: {query}

            Instructions:
            1. Provide a clear, accurate answer based on the context
            2. If the context doesn't contain enough information, say so
            3. Be concise but comprehensive
            4. Use a professional, helpful tone

            Answer:
            """

            #generating response
            response = self.client.chat.completions.create(
                model=self.config.chat_model,
                messages=[
                    {"role": "system", "content": "You are a helpful business QA assistant."},
                    {"role": "user", "content": prompt}
                ],
                temperature=self.config.temperature,
                max_tokens=1000
            )

            return response.choices[0].message.content

        except Exception as e:
            logger.error(f"Error generating response: {e}")
            return "I apologize, but I encountered an error while generating a response."

    def _generate_demo_response(self, query: str, context: List[Dict]) -> str:
        """Generate a demo response without OpenAI API"""
        if not context:
            return "I couldn't find relevant information to answer your question. Please try rephrasing your query."

        #simple keyword matching for demo
        query_lower = query.lower()

        #extracting relevant text from context
        relevant_text = ""
        for item in context[:2]:  # Use top 2 results
            relevant_text += item['text'] + " "

        #generating simple response based on keywords
        if any(word in query_lower for word in ['hours', 'working', 'time']):
            return f"Based on the available information: {relevant_text[:300]}..."
        elif any(word in query_lower for word in ['cost', 'price', 'pricing']):
            return f"Regarding pricing: {relevant_text[:300]}..."
        elif any(word in query_lower for word in ['support', 'help', 'contact']):
            return f"For support information: {relevant_text[:300]}..."
        else:
            return f"Based on the context: {relevant_text[:300]}..."

    def ask_question(self, query: str) -> Dict[str, Any]:
        """Complete QA pipeline"""
        try:
            #retrieve relevant context
            context = self.retrieve_relevant_context(query)

            #generate response
            answer = self.generate_response(query, context)

            #prepare response
            response = {
                "question": query,
                "answer": answer,
                "context_sources": len(context),
                "retrieved_context": context,
                "timestamp": datetime.now().isoformat()
            }

            return response

        except Exception as e:
            logger.error(f"Error in QA pipeline: {e}")
            return {
                "question": query,
                "answer": "I apologize, but I encountered an error while processing your question.",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }

In [32]:
def create_sample_business_data():
    """Create sample business documents for demo"""

    sample_docs = [
        """
        Company Overview:
        TechCorp Solutions is a leading technology company specializing in cloud computing,
        artificial intelligence, and digital transformation services. Founded in 2020, we have
        grown to serve over 500 enterprise clients across various industries including healthcare,
        finance, and retail. Our mission is to empower businesses through innovative technology
        solutions that drive growth and efficiency.

        Our core services include:
        - Cloud migration and management
        - AI/ML consulting and implementation
        - Digital transformation strategy
        - Cybersecurity solutions
        - Data analytics and business intelligence
        """,

        """
        HR Policies and Benefits:

        Working Hours: Standard working hours are 9 AM to 6 PM, Monday through Friday.
        We offer flexible working arrangements including remote work options and flexible hours.

        Leave Policy: Employees are entitled to 25 days of paid vacation annually,
        10 sick days, and 3 personal days. We also observe all major holidays.

        Benefits Package:
        - Comprehensive health insurance (medical, dental, vision)
        - 401(k) retirement plan with company matching
        - Professional development budget of $2,000 per year
        - Wellness programs including gym membership reimbursement
        - Life and disability insurance
        - Parental leave: 12 weeks paid leave for new parents
        """,

        """
        Product Information:

        CloudSync Pro: Our flagship cloud management platform that helps businesses
        migrate, monitor, and optimize their cloud infrastructure. Features include
        automated scaling, cost optimization, and security monitoring. Pricing starts
        at $500/month for small businesses and scales based on usage.

        AI Assistant Suite: A comprehensive AI toolkit that includes chatbots,
        document processing, and predictive analytics. Perfect for businesses looking
        to automate routine tasks and gain insights from their data. Custom pricing
        based on requirements.

        SecureGuard: Enterprise-grade cybersecurity solution providing real-time
        threat detection, vulnerability assessment, and incident response.
        Pricing: $50 per user per month.
        """,

        """
        Sales and Support:

        Sales Process: Our sales team follows a consultative approach, starting with
        a needs assessment, followed by a custom proposal, and implementation planning.
        Average sales cycle is 30-45 days for enterprise clients.

        Support Channels:
        - 24/7 phone support for premium clients
        - Email support with 4-hour response time
        - Live chat during business hours
        - Comprehensive knowledge base and documentation
        - Dedicated account managers for enterprise clients

        Training and Onboarding: We provide comprehensive training programs for all
        our products, including online courses, live workshops, and one-on-one sessions.
        """,

        """
        Financial Information:

        Pricing Structure: We offer flexible pricing models including subscription-based,
        usage-based, and enterprise licensing. Volume discounts are available for
        large organizations.

        Payment Terms: Standard payment terms are Net 30 days. We accept various
        payment methods including credit cards, wire transfers, and ACH.

        Refund Policy: We offer a 30-day money-back guarantee for all new customers.
        Refunds are processed within 5-7 business days.

        Company Growth: Year-over-year revenue growth of 150% in 2024. We're
        expanding into new markets and planning to open offices in Europe and Asia.
        """
    ]

    return sample_docs

def run_demo():
    """Run a complete demo of the RAG system"""

    print("=" * 60)
    print("RAG MODEL QA BOT - DEMONSTRATION")
    print("=" * 60)

    #setting up API keys
    print("\n1. Setting up API keys...")
    if not setup_api_keys():
        print("Please configure your API keys to run the demo")
        return

    #initializing RAG system
    print("\n2. Initializing RAG system...")
    try:
        rag_system = RAGSystem(config)
        print("✓ RAG system initialized successfully")
    except Exception as e:
        print(f"✗ Error initializing RAG system: {e}")
        return

    #ingesting sample documents
    print("\n3. Ingesting sample business documents...")
    try:
        sample_docs = create_sample_business_data()
        rag_system.ingest_documents(sample_docs)
        print(f"✓ Successfully ingested {len(sample_docs)} documents")
    except Exception as e:
        print(f"✗ Error ingesting documents: {e}")
        return

    #my demo questions
    demo_questions = [
        "What are the company's working hours and leave policies?",
        "How much does CloudSync Pro cost?",
        "What support channels are available?",
        "What is the refund policy?",
        "What are the company's core services?"
    ]

    print("\n4. Testing QA capabilities...")
    print("=" * 60)

    for i, question in enumerate(demo_questions, 1):
        print(f"\nQuestion {i}: {question}")
        print("-" * 50)

        try:
            response = rag_system.ask_question(question)
            print(f"Answer: {response['answer']}")
            print(f"Sources used: {response['context_sources']}")

        except Exception as e:
            print(f"Error: {e}")

        print("-" * 50)

    print("\n5. Demo completed successfully!")
    print("=" * 60)

In [33]:
class AdvancedRAGFeatures:
    """Advanced features for production RAG system"""

    def __init__(self, rag_system: RAGSystem):
        self.rag_system = rag_system
        self.query_history = []
        self.performance_metrics = {
            "total_queries": 0,
            "avg_response_time": 0,
            "successful_responses": 0
        }

    def hybrid_search(self, query: str) -> List[Dict]:
        """Combine vector and keyword search"""
        #performing vector search
        vector_results = self.rag_system.retrieve_relevant_context(query)

        #keyyword search
        keywords = self.rag_system.doc_processor.extract_keywords(query)

        #combining and rank results
        combined_results = vector_results

        return combined_results

    def query_expansion(self, query: str) -> str:
        """Expand query with related terms"""
        #openAI to generate query variations
        try:
            response = self.rag_system.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "Generate 3 alternative ways to ask the same question."},
                    {"role": "user", "content": f"Original question: {query}"}
                ],
                temperature=0.3,
                max_tokens=200
            )

            expanded_query = response.choices[0].message.content
            return f"{query} {expanded_query}"

        except Exception as e:
            logger.error(f"Error expanding query: {e}")
            return query

    def answer_validation(self, answer: str, context: List[Dict]) -> Dict[str, Any]:
        """Validate answer quality and relevance"""
        validation_metrics = {
            "confidence_score": 0.8,
            "context_relevance": len(context) > 0,
            "answer_completeness": len(answer) > 50,
            "factual_consistency": True
        }

        return validation_metrics

    def get_analytics(self) -> Dict[str, Any]:
        """Get system analytics and performance metrics"""
        return {
            "performance_metrics": self.performance_metrics,
            "query_history_count": len(self.query_history),
            "system_health": "operational"
        }

In [34]:
def interactive_demo():
    """Interactive demo interface"""

    print("\n" + "=" * 60)
    print("INTERACTIVE RAG QA BOT")
    print("=" * 60)
    print("Type 'exit' to quit, 'help' for commands")


    try:
        rag_system = RAGSystem(config)
        sample_docs = create_sample_business_data()
        rag_system.ingest_documents(sample_docs)
        advanced_features = AdvancedRAGFeatures(rag_system)

        print("✓ System ready!")

    except Exception as e:
        print(f"✗ Error initializing system: {e}")
        return

    while True:
        try:
            user_input = input("\nYour question: ").strip()

            if user_input.lower() == 'exit':
                print("Goodbye!")
                break
            elif user_input.lower() == 'help':
                print("\nAvailable commands:")
                print("- Ask any question about the business")
                print("- 'analytics' - View system analytics")
                print("- 'exit' - Quit the demo")
                continue
            elif user_input.lower() == 'analytics':
                analytics = advanced_features.get_analytics()
                print(f"\nSystem Analytics: {json.dumps(analytics, indent=2)}")
                continue
            elif not user_input:
                continue


            start_time = datetime.now()
            response = rag_system.ask_question(user_input)
            end_time = datetime.now()

            response_time = (end_time - start_time).total_seconds()


            print(f"\nAnswer: {response['answer']}")
            print(f"Response time: {response_time:.2f} seconds")
            print(f"Sources: {response['context_sources']}")

        except KeyboardInterrupt:
            print("\nGoodbye!")
            break
        except Exception as e:
            print(f"Error: {e}")


In [40]:
if __name__ == "__main__":
    print("RAG MODEL QA BOT - BUSINESS IMPLEMENTATION")
    print("=" * 60)

    #configuration check
    print("Configuration:")
    print(f"- Embedding Model: {config.embedding_model}")
    print(f"- Chat Model: {config.chat_model}")
    print(f"- Chunk Size: {config.chunk_size}")
    print(f"- Top K Results: {config.top_k}")

    #demo
    choice = input("\nChoose demo mode:\n1. Automated demo\n2. Interactive demo\n3. Setup only\nChoice (1-3): ").strip()

    if choice == "1":
        run_demo()
    elif choice == "2":
        interactive_demo()
    elif choice == "3":
        print("Setup completed. You can now use the RAG system.")
    else:
        print("Running automated demo...")
        run_demo()


RAG MODEL QA BOT - BUSINESS IMPLEMENTATION
Configuration:
- Embedding Model: text-embedding-3-large
- Chat Model: gpt-4-turbo-preview
- Chunk Size: 1000
- Top K Results: 5

Choose demo mode:
1. Automated demo
2. Interactive demo
3. Setup only
Choice (1-3): 3
Setup completed. You can now use the RAG system.


In [41]:
class RAGEvaluation:
    """Evaluation framework for RAG system"""

    def __init__(self, rag_system: RAGSystem):
        self.rag_system = rag_system

    def evaluate_retrieval_quality(self, test_queries: List[str]) -> Dict[str, float]:
        """Evaluate retrieval quality metrics"""
        metrics = {
            "avg_retrieval_time": 0,
            "avg_context_relevance": 0,
            "retrieval_success_rate": 0
        }

        total_time = 0
        successful_retrievals = 0

        for query in test_queries:
            start_time = datetime.now()
            context = self.rag_system.retrieve_relevant_context(query)
            end_time = datetime.now()

            total_time += (end_time - start_time).total_seconds()

            if context:
                successful_retrievals += 1

        metrics["avg_retrieval_time"] = total_time / len(test_queries)
        metrics["retrieval_success_rate"] = successful_retrievals / len(test_queries)

        return metrics

    def evaluate_answer_quality(self, test_qa_pairs: List[Dict]) -> Dict[str, float]:
        """Evaluate answer quality metrics"""
        #this would include metrics like BLEU, ROUGE, etc.
        #for demo purposes here i have used simple metrics

        metrics = {
            "avg_answer_length": 0,
            "response_completeness": 0
        }

        total_length = 0
        complete_responses = 0

        for qa_pair in test_qa_pairs:
            response = self.rag_system.ask_question(qa_pair["question"])
            answer = response["answer"]

            total_length += len(answer)

            if len(answer) > 50 and "sorry" not in answer.lower():
                complete_responses += 1

        metrics["avg_answer_length"] = total_length / len(test_qa_pairs)
        metrics["response_completeness"] = complete_responses / len(test_qa_pairs)

        return metrics

#testing the evaluation
def run_evaluation():
    """Run evaluation tests"""
    print("\n" + "=" * 60)
    print("RAG SYSTEM EVALUATION")
    print("=" * 60)

    #initialize system
    rag_system = RAGSystem(config)
    sample_docs = create_sample_business_data()
    rag_system.ingest_documents(sample_docs)

    evaluator = RAGEvaluation(rag_system)

    #tesst queries
    test_queries = [
        "What are the working hours?",
        "How much does CloudSync Pro cost?",
        "What is the refund policy?",
        "What support options are available?",
        "What are the core services?"
    ]

    test_qa_pairs = [
        {"question": "What are the working hours?", "expected": "9 AM to 6 PM"},
        {"question": "What is the refund policy?", "expected": "30-day money-back guarantee"}
    ]

    # evaluations
    retrieval_metrics = evaluator.evaluate_retrieval_quality(test_queries)
    answer_metrics = evaluator.evaluate_answer_quality(test_qa_pairs)

    print("\nRetrieval Metrics:")
    for metric, value in retrieval_metrics.items():
        print(f"- {metric}: {value:.3f}")

    print("\nAnswer Quality Metrics:")
    for metric, value in answer_metrics.items():
        print(f"- {metric}: {value:.3f}")

    print("\n" + "=" * 60)

In [42]:
def export_system_config():
    """Export system configuration for deployment"""
    deployment_config = {
        "model_settings": {
            "embedding_model": config.embedding_model,
            "chat_model": config.chat_model,
            "temperature": config.temperature,
            "chunk_size": config.chunk_size
        },
        "infrastructure": {
            "pinecone_index": config.index_name,
            "vector_dimension": 3072,
            "similarity_threshold": config.similarity_threshold
        },
        "performance": {
            "max_tokens": config.max_tokens,
            "top_k": config.top_k
        }
    }
