In [None]:
# Agentic RAG System Implementation
# This notebook creates an intelligent agent that selects the most relevant web source for answering queries

import asyncio
import google.generativeai as genai
import requests
from bs4 import BeautifulSoup
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import time
import re
from typing import Dict, List, Tuple, Optional
import json

# Configuration
class Config:
    def __init__(self):
        self.api_key = ""  # Replace with your actual API key
        self.embedding_model = "models/text-embedding-004"
        self.generative_model = "models/gemini-1.5-flash-latest"
        self.chunk_size = 800
        self.chunk_overlap = 100
        self.top_k_chunks = 3

config = Config()

# Initialize Google AI
genai.configure(api_key=config.api_key)

class WebContentFetcher:
    """Handles web content fetching and text extraction"""

    @staticmethod
    def fetch_web_content(url: str) -> Optional[str]:
        """Fetch and extract text content from a web URL"""
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'html.parser')

            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()

            # Extract text
            text = soup.get_text()

            # Clean up text
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = ' '.join(chunk for chunk in chunks if chunk)

            return text

        except Exception as e:
            print(f"Error fetching content from {url}: {str(e)}")
            return None

class TextProcessor:
    """Handles text processing including chunking and embedding generation"""

    @staticmethod
    def chunk_text(text: str, chunk_size: int = 800, overlap: int = 100) -> List[str]:
        """Split text into overlapping chunks"""
        if len(text) <= chunk_size:
            return [text]

        chunks = []
        start = 0

        while start < len(text):
            end = start + chunk_size

            # Try to break at sentence boundary
            if end < len(text):
                # Look for sentence endings within the last 100 characters
                sentence_break = text.rfind('.', start, end)
                if sentence_break != -1 and sentence_break > start + chunk_size // 2:
                    end = sentence_break + 1

            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)

            start = end - overlap
            if start >= len(text):
                break

        return chunks

    @staticmethod
    def generate_embedding(text: str) -> np.ndarray:
        """Generate embedding for text using Google's embedding model"""
        try:
            result = genai.embed_content(
                model=config.embedding_model,
                content=text,
                task_type="retrieval_document"
            )
            return np.array(result['embedding'])
        except Exception as e:
            print(f"Error generating embedding: {str(e)}")
            return np.zeros(768)  # Default embedding size

    @staticmethod
    def calculate_similarity(embedding1: np.ndarray, embedding2: np.ndarray) -> float:
        """Calculate cosine similarity between two embeddings"""
        return cosine_similarity([embedding1], [embedding2])[0][0]

class DataStore:
    """In-memory storage for web content and embeddings"""

    def __init__(self):
        self.sources = {}  # URL -> source data

    def add_source(self, url: str, content: str):
        """Add a web source to the data store"""
        print(f"üìÑ Processing content from: {url}")

        # Create summary (first 1000 characters)
        summary = content[:1000] + "..." if len(content) > 1000 else content

        # Generate chunks
        chunks = TextProcessor.chunk_text(content, config.chunk_size, config.chunk_overlap)
        print(f"   Created {len(chunks)} chunks")

        # Generate embeddings
        print("   Generating embeddings...")
        summary_embedding = TextProcessor.generate_embedding(summary)
        chunk_embeddings = [TextProcessor.generate_embedding(chunk) for chunk in chunks]

        self.sources[url] = {
            'url': url,
            'content': content,
            'summary': summary,
            'chunks': chunks,
            'summary_embedding': summary_embedding,
            'chunk_embeddings': chunk_embeddings
        }

        print(f"   ‚úÖ Source processed successfully!")

    def get_source_names(self) -> List[str]:
        """Get list of loaded source URLs"""
        return list(self.sources.keys())

class AgentRouter:
    """The core agent that routes queries to the best source"""

    def __init__(self, data_store: DataStore):
        self.data_store = data_store

    def select_best_source(self, query: str) -> Tuple[str, float, Dict[str, float]]:
        """Select the best source for a given query"""
        query_embedding = TextProcessor.generate_embedding(query)

        scores = {}
        best_source = None
        best_score = -1

        print("ü§ñ Agent evaluating sources...")

        for url, source_data in self.data_store.sources.items():
            similarity = TextProcessor.calculate_similarity(
                query_embedding,
                source_data['summary_embedding']
            )
            scores[url] = similarity

            # Visual indication of evaluation
            print(f"   üîç {url}: {similarity:.3f} relevance")

            if similarity > best_score:
                best_score = similarity
                best_source = url

        if best_source:
            print(f"   üéØ Agent chose: {best_source} ({best_score:.3f} relevance)")

        return best_source, best_score, scores

class RetrievalModule:
    """Handles retrieval of relevant chunks from the selected source"""

    def __init__(self, data_store: DataStore):
        self.data_store = data_store

    def retrieve_chunks(self, query: str, source_url: str, top_k: int = 3) -> List[str]:
        """Retrieve top K chunks from the selected source"""
        if source_url not in self.data_store.sources:
            return []

        source_data = self.data_store.sources[source_url]
        query_embedding = TextProcessor.generate_embedding(query)

        # Calculate similarities with all chunks
        chunk_scores = []
        for i, chunk_embedding in enumerate(source_data['chunk_embeddings']):
            similarity = TextProcessor.calculate_similarity(query_embedding, chunk_embedding)
            chunk_scores.append((i, similarity))

        # Sort by similarity and get top K
        chunk_scores.sort(key=lambda x: x[1], reverse=True)
        top_chunks = [source_data['chunks'][i] for i, _ in chunk_scores[:top_k]]

        return top_chunks

class LLMGenerator:
    """Handles LLM interaction for generating responses"""

    @staticmethod
    def generate_response(query: str, context_chunks: List[str]) -> str:
        """Generate response using LLM with retrieved context"""
        context = "\n\n".join(context_chunks)

        prompt = f"""Based on the following context, answer the user's question. If the context doesn't contain enough information to answer the question, say so clearly.

Context:
{context}

Question: {query}

Answer:"""

        try:
            model = genai.GenerativeModel(config.generative_model)
            response = model.generate_content(prompt)
            return response.text
        except Exception as e:
            return f"Error generating response: {str(e)}"

class EvaluationMetrics:
    """Handles evaluation of agent performance"""

    def __init__(self):
        self.agent_choices = []
        self.correct_choices = []

    def add_evaluation(self, agent_choice: str, correct_choice: str):
        """Add an evaluation pair"""
        self.agent_choices.append(agent_choice)
        self.correct_choices.append(correct_choice)

    def calculate_metrics(self) -> Dict[str, float]:
        """Calculate evaluation metrics"""
        if not self.agent_choices:
            return {}

        # Accuracy
        correct_count = sum(1 for agent, correct in zip(self.agent_choices, self.correct_choices)
                           if agent == correct)
        accuracy = correct_count / len(self.agent_choices)

        # For multi-class classification metrics, we'll calculate macro averages
        sources = list(set(self.agent_choices + self.correct_choices))

        precisions = []
        recalls = []

        for source in sources:
            # True positives: agent chose this source and it was correct
            tp = sum(1 for agent, correct in zip(self.agent_choices, self.correct_choices)
                    if agent == source and correct == source)

            # False positives: agent chose this source but it was wrong
            fp = sum(1 for agent, correct in zip(self.agent_choices, self.correct_choices)
                    if agent == source and correct != source)

            # False negatives: agent didn't choose this source but should have
            fn = sum(1 for agent, correct in zip(self.agent_choices, self.correct_choices)
                    if agent != source and correct == source)

            # Calculate precision and recall for this source
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0

            precisions.append(precision)
            recalls.append(recall)

        # Macro averages
        avg_precision = np.mean(precisions) if precisions else 0
        avg_recall = np.mean(recalls) if recalls else 0

        # F1-score
        f1_score = 2 * (avg_precision * avg_recall) / (avg_precision + avg_recall) if (avg_precision + avg_recall) > 0 else 0

        return {
            'accuracy': accuracy,
            'precision': avg_precision,
            'recall': avg_recall,
            'f1_score': f1_score
        }

class AgenticRAGSystem:
    """Main RAG system orchestrator"""

    def __init__(self):
        self.data_store = DataStore()
        self.agent_router = AgentRouter(self.data_store)
        self.retrieval_module = RetrievalModule(self.data_store)
        self.llm_generator = LLMGenerator()
        self.evaluation_metrics = EvaluationMetrics()

        # UI components
        self.setup_ui()

    def setup_ui(self):
        """Setup the interactive UI"""
        # URL input
        self.url_input = widgets.Textarea(
            value="https://example.com/page1,https://example.com/page2,https://example.com/page3",
            placeholder="Enter comma-separated URLs",
            description="Web URLs:",
            layout=widgets.Layout(width='100%', height='80px')
        )

        # Load button
        self.load_button = widgets.Button(
            description="Load Sources",
            button_style='success',
            layout=widgets.Layout(width='200px')
        )
        self.load_button.on_click(self.load_sources)

        # Query input
        self.query_input = widgets.Text(
            placeholder="Enter your question here",
            description="Query:",
            layout=widgets.Layout(width='100%')
        )
        self.query_input.on_submit(self.process_query)

        # Feedback input
        self.feedback_input = widgets.Dropdown(
            options=[],
            description="Correct source:",
            layout=widgets.Layout(width='100%')
        )

        # Feedback button
        self.feedback_button = widgets.Button(
            description="Submit Feedback",
            button_style='info',
            layout=widgets.Layout(width='200px')
        )
        self.feedback_button.on_click(self.submit_feedback)

        # Evaluation button
        self.eval_button = widgets.Button(
            description="Show Evaluation",
            button_style='warning',
            layout=widgets.Layout(width='200px')
        )
        self.eval_button.on_click(self.show_evaluation)

        # Output area
        self.output = widgets.Output()

        # Store current query context
        self.current_agent_choice = None
        self.current_query = None

    def display_ui(self):
        """Display the complete UI"""
        display(HTML("<h2>ü§ñ Agentic RAG System</h2>"))
        display(HTML("<p>Enter web URLs, load sources, then ask questions!</p>"))

        display(self.url_input)
        display(self.load_button)
        display(HTML("<hr>"))
        display(self.query_input)
        display(HTML("<hr>"))
        display(self.feedback_input)
        display(self.feedback_button)
        display(self.eval_button)
        display(HTML("<hr>"))
        display(self.output)

    def load_sources(self, button):
        """Load web sources from URLs"""
        with self.output:
            clear_output()
            print("üöÄ Loading sources...")

            urls = [url.strip() for url in self.url_input.value.split(',') if url.strip()]

            if not urls:
                print("‚ùå No valid URLs provided")
                return

            fetcher = WebContentFetcher()

            for url in urls:
                print(f"\nüåê Fetching content from: {url}")
                content = fetcher.fetch_web_content(url)

                if content:
                    self.data_store.add_source(url, content)
                else:
                    print(f"‚ùå Failed to fetch content from: {url}")

            # Update feedback dropdown
            self.feedback_input.options = self.data_store.get_source_names()

            print(f"\n‚úÖ Successfully loaded {len(self.data_store.sources)} sources")
            print("Ready to answer questions! üéØ")

    def process_query(self, text_widget):
        """Process user query"""
        query = text_widget.value.strip()

        if not query:
            return

        if query.lower() == 'evaluate':
            self.show_evaluation(None)
            return

        with self.output:
            clear_output()
            print(f"‚ùì Query: {query}")
            print()

            if not self.data_store.sources:
                print("‚ùå No sources loaded. Please load sources first.")
                return

            # Agent selects best source
            best_source, relevance_score, all_scores = self.agent_router.select_best_source(query)

            if not best_source:
                print("‚ùå No suitable source found")
                return

            print(f"\nüìä Relevance Score: {relevance_score:.1%}")

            # Retrieve relevant chunks
            print("\nüîç Retrieving relevant information...")
            chunks = self.retrieval_module.retrieve_chunks(query, best_source, config.top_k_chunks)

            if not chunks:
                print("‚ùå No relevant chunks found")
                return

            # Generate response
            print("\nüí≠ Generating response...")
            response = self.llm_generator.generate_response(query, chunks)

            print(f"\nü§ñ Answer:\n{response}")
            print(f"\nüìã Source: {best_source}")
            print(f"üìä Relevance: {relevance_score:.1%}")

            # Store for feedback
            self.current_agent_choice = best_source
            self.current_query = query

            print("\nüëá Please provide feedback below if the agent chose the wrong source!")

    def submit_feedback(self, button):
        """Submit feedback on agent's choice"""
        if not self.current_agent_choice or not self.feedback_input.value:
            with self.output:
                print("‚ùå No feedback to submit")
            return

        correct_choice = self.feedback_input.value

        self.evaluation_metrics.add_evaluation(self.current_agent_choice, correct_choice)

        with self.output:
            print(f"‚úÖ Feedback recorded!")
            print(f"   Agent chose: {self.current_agent_choice}")
            print(f"   Correct source: {correct_choice}")
            print(f"   Total evaluations: {len(self.evaluation_metrics.agent_choices)}")

        # Clear current context
        self.current_agent_choice = None
        self.current_query = None

    def show_evaluation(self, button):
        """Display evaluation metrics"""
        with self.output:
            clear_output()
            print("üìä Agent Performance Evaluation")
            print("=" * 40)

            metrics = self.evaluation_metrics.calculate_metrics()

            if not metrics:
                print("‚ùå No evaluation data available")
                print("Submit some queries and feedback first!")
                return

            print(f"üìà Accuracy:  {metrics['accuracy']:.3f} ({metrics['accuracy']:.1%})")
            print(f"üéØ Precision: {metrics['precision']:.3f} ({metrics['precision']:.1%})")
            print(f"üîç Recall:    {metrics['recall']:.3f} ({metrics['recall']:.1%})")
            print(f"‚öñÔ∏è  F1-Score:  {metrics['f1_score']:.3f} ({metrics['f1_score']:.1%})")

            print(f"\nüìä Total Evaluations: {len(self.evaluation_metrics.agent_choices)}")

            # Show detailed breakdown
            print("\nüìã Detailed Results:")
            for i, (agent, correct) in enumerate(zip(self.evaluation_metrics.agent_choices,
                                                   self.evaluation_metrics.correct_choices)):
                status = "‚úÖ" if agent == correct else "‚ùå"
                print(f"   {i+1}. {status} Agent: {agent[:50]}...")
                if agent != correct:
                    print(f"      Correct: {correct[:50]}...")

# Initialize and display the system
print("üöÄ Initializing Agentic RAG System...")
print("‚ö†Ô∏è  Please replace 'YOUR_API_KEY_HERE' with your actual Google AI Studio API key")
print()

rag_system = AgenticRAGSystem()
rag_system.display_ui()

üöÄ Initializing Agentic RAG System...
‚ö†Ô∏è  Please replace 'YOUR_API_KEY_HERE' with your actual Google AI Studio API key



Textarea(value='https://example.com/page1,https://example.com/page2,https://example.com/page3', description='W‚Ä¶

Button(button_style='success', description='Load Sources', layout=Layout(width='200px'), style=ButtonStyle())

Text(value='', description='Query:', layout=Layout(width='100%'), placeholder='Enter your question here')

Dropdown(description='Correct source:', layout=Layout(width='100%'), options=(), value=None)

Button(button_style='info', description='Submit Feedback', layout=Layout(width='200px'), style=ButtonStyle())



Output()