In [None]:
import google.generativeai as genai
import os
from dotenv import load_dotenv
import os
import PyPDF2
import wikipedia
import requests
from typing import List, Dict, Any, Optional
from langchain.agents import AgentExecutor, initialize_agent, AgentType
from langchain.tools import Tool
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import WebBaseLoader
from tavily import TavilyClient
import warnings
warnings.filterwarnings("ignore")


In [None]:
load_dotenv()
api_key=os.getenv('GOOGLE_API_KEY')
api_key

In [None]:
genai.configure(api_key=api_key)

In [None]:
gemini_pro = ChatGoogleGenerativeAI(
    model="gemini-pro",
    temperature=0.3
)


from langchain_google_genai import ChatGoogleGenerativeAI

gemini_pro=genai.GenerativeModel(model_name='gemini-1.5-flash')


embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)

In [None]:
class AdvancedAgenticRAG:
    def __init__(self, db_directory="./chroma_db"):
        """
        Initialize the Advanced Agentic RAG system.
        
        Args:
            db_directory (str): Path to store the vector database
        """
        self.db_directory = db_directory
        self.tavily_client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

        try:
            self.vector_store = Chroma(
                persist_directory=db_directory,
                embedding_function=embeddings
            )
            print(f"Vector database loaded from {db_directory}")
        except:
            self.vector_store = Chroma(
                embedding_function=embeddings,
                persist_directory=db_directory
            )
            print(f"New vector database created at {db_directory}")

        self.setup_tools()
        self.setup_agent()

    def setup_tools(self):
        """Define the tools the agent can use"""
        self.tools = [
            Tool(
                name="SearchDatabase",
                func=self.search_vector_db,
                description="Search the vector database for relevant information"
            ),
            Tool(
                name="ProcessPDF",
                func=self.process_pdf,
                description="Extract text from a PDF file and store it in the vector database"
            ),
            Tool(
                name="WebSearch",
                func=self.web_search,
                description="Perform web search using Tavily Search API to fetch recent information"
            ),
            Tool(
                name="ProcessWikipedia",
                func=self.process_wikipedia,
                description="Retrieve content from a Wikipedia article and store it in the vector database"
            ),
            Tool(
                name="QueryRefiner",
                func=self.refine_query,
                description="Improve a search query for better retrieval results"
            )
        ]

    def setup_agent(self):
        """Configure the agent using ReAct pattern"""
        self.agent_executor = initialize_agent(
            tools=self.tools,
            llm=gemini_pro,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=5
        )

    def extract_text_from_pdf(self, pdf_path):
        """Extract text content from a PDF file"""
        try:
            text = ""
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page_num in range(len(pdf_reader.pages)):
                    page = pdf_reader.pages[page_num]
                    text += page.extract_text() + "\n"
            return text
        except Exception as e:
            return f"Error extracting text from PDF: {str(e)}"

    def process_pdf(self, pdf_path):
        """Extract text from a PDF and store it in the vector DB"""
        try:
            text = self.extract_text_from_pdf(pdf_path)
            chunks = text_splitter.split_text(text)

            from langchain_core.documents import Document
            documents = [Document(page_content=chunk, metadata={"source": pdf_path, "type": "pdf"}) for chunk in chunks]

            self.vector_store.add_documents(documents)
            self.vector_store.persist()

            return f"{len(chunks)} text chunks from '{pdf_path}' loaded into the vector database."
        except Exception as e:
            return f"Error processing PDF: {str(e)}"

    def web_search(self, query: str) -> str:
        """Perform a web search using the Tavily Search API"""
        try:
            search_result = self.tavily_client.search(
                query=query,
                search_depth="advanced",
                include_answer=True,
                include_domains=[]
            )

            result_text = search_result.get('answer', '') + "\n\nSources:\n"
            for i, result in enumerate(search_result.get('results', [])[:3]):
                result_text += f"{i+1}. {result.get('title')}: {result.get('url')}\n"
                result_text += f"   {result.get('content')[:200]}...\n\n"
            return result_text
        except Exception as e:
            return f"Error during web search: {str(e)}"

    def process_wikipedia(self, title_or_url: str) -> str:
        """Process a Wikipedia article and store it in the vector database"""
        try:
            if title_or_url.startswith("http"):
                title = title_or_url.split("/")[-1].replace("_", " ")
            else:
                title = title_or_url

            wikipedia.set_lang("en")  
            try:
                page = wikipedia.page(title)
            except wikipedia.DisambiguationError as e:
                page = wikipedia.page(e.options[0])

            content = page.content
            chunks = text_splitter.split_text(content)

            from langchain_core.documents import Document
            documents = [Document(
                page_content=chunk,
                metadata={"source": page.url, "title": page.title, "type": "wikipedia"}
            ) for chunk in chunks]

            self.vector_store.add_documents(documents)
            self.vector_store.persist()

            return f"{len(chunks)} chunks from Wikipedia article '{page.title}' added to the vector database."
        except Exception as e:
            return f"Error processing Wikipedia article: {str(e)}"

    def search_vector_db(self, query: str) -> str:
        """Search the vector database for relevant documents"""
        try:
            docs = self.vector_store.similarity_search(query, k=4)

            if not docs:
                return "No relevant information found in the database."

            results = []
            for i, doc in enumerate(docs):
                source_info = f"Source: {doc.metadata.get('source', 'Unknown')}"
                if 'title' in doc.metadata:
                    source_info += f" (Title: {doc.metadata['title']})"

                results.append(f"Document {i+1}:\n{doc.page_content}\n{source_info}\n")

            return "\n".join(results)
        except Exception as e:
            return f"Error searching the vector database: {str(e)}"

    def refine_query(self, original_query: str) -> str:
        """Improve the user's search query"""
        prompt = f"""
        Original Query: "{original_query}"
        
        Please improve this query to be more specific and effective for retrieving relevant information.
        
        The improved query should:
        1. Be more precise and less ambiguous
        2. Include relevant keywords
        3. Be phrased in a search-friendly way
        
        Improved Query:
        """
        messages = [HumanMessage(content=prompt)]
        response = gemini_pro.invoke(messages)

        return response.content

    def process_user_input(self, user_input: str) -> Dict:
        """
        Analyze user input to determine the appropriate action
        """
        if user_input.lower().endswith('.pdf') or "pdf" in user_input.lower() and "process" in user_input.lower():
            pdf_path = user_input.split()[-1]
            if pdf_path.endswith('.pdf'):
                return {"action": "process_pdf", "path": pdf_path}

        if "wikipedia" in user_input.lower():
            if "https://" in user_input:
                words = user_input.split()
                for word in words:
                    if word.startswith("https://") and "wikipedia" in word:
                        return {"action": "process_wikipedia", "url": word}
            else:
                prompt = f"""
                Extract the Wikipedia article title from the following text:
                {user_input}
                
                Only return the title, nothing else:
                """
                messages = [HumanMessage(content=prompt)]
                response = gemini_pro.invoke(messages)
                title = response.content.strip()
                return {"action": "process_wikipedia", "url": title}

        return {"action": "question", "query": user_input}

    def run(self, query: str, chat_history: List[Dict] = None):
        """Run the Agentic RAG system to handle a user query"""
        if chat_history is None:
            chat_history = []

        input_analysis = self.process_user_input(query)

        if input_analysis["action"] == "process_pdf":
            result = self.process_pdf(input_analysis["path"])
            return {"output": result}

        elif input_analysis["action"] == "process_wikipedia":
            result = self.process_wikipedia(input_analysis["url"])
            return {"output": result}

        else:
            formatted_chat_history = ""
            for message in chat_history:
                role = message.get("role", "")
                content = message.get("content", "")
                formatted_chat_history += f"{role.capitalize()}: {content}\n"

            return self.agent_executor.invoke({
                "input": query,
                "chat_history": formatted_chat_history
            })


In [None]:
if __name__ == "__main__":
    try:
        print("Initializing Advanced Agentic RAG System...")
        rag_system = AdvancedAgenticRAG(db_directory="./chroma_db")
        print("System initialized successfully!")

        pdf_path = r"D:\AI\AI projects\main\OpenAI Agents Practical Guide .pdf"
        print(f"\nProcessing PDF: {pdf_path}")
        result = rag_system.process_pdf(pdf_path)
        print(result)

        query = "What are the key components of an OpenAI agent system according to the guide?"
        response = rag_system.run(query)
        print("\nResponse:", response["output"])

    except Exception as e:
        print(f"An error occurred: {str(e)}")
        import traceback
        traceback.print_exc()
