In [1]:
from crewai import Agent, Task, Crew, Process


In [2]:
import os
import shutil
import time
import logging
from dotenv import load_dotenv
from typing import Iterator, List
# CrewAI imports

# LLM
from langchain_community.chat_models.ollama import ChatOllama
# from agent.utils.load_documents import covert_document
from agent.tools.retrieve_tool import RetrieveTool, IngestTool
from langchain_core.documents import Document
from langchain_core.document_loaders import BaseLoader
# from docling.document_converter import DocumentConverter
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, UnstructuredWordDocumentLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter


/home/ctai-datpd-l/anaconda3/envs/ai_agents/lib/python3.11/site-packages/pydantic/_internal/_config.py:323: PydanticDeprecatedSince20: Support for class-based `config` is deprecated, use ConfigDict instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  from tqdm.autonotebook import tqdm, trange


In [3]:
file_path = ["data/2501.07329v2.pdf",
             "data/ielts_listening_practice_test_pdf_1_1_1ae068b05d.pdf"]

In [4]:
# class DocumentPDFLoader(BaseLoader):
    
#     def __init__(self, filepath: List[str]) -> None: 
#         self._filepath = filepath if isinstance(filepath, list) else [filepath]
#         self._coverter = DocumentConverter()
    
#     def lazy_load (self)->Iterator[Document]:
#         for file in self._filepath:
#             dl = self._coverter.convert(file).document
#             text = dl.export_to_markdown()
#             yield Document(page_content=text)


In [5]:
# pdf_loader = DocumentPDFLoader(file_path)

class DocumentPDFLoader(BaseLoader):
    def __init__(self, filepath: List[str]) -> None: 
        self._filepath = filepath if isinstance(filepath, list) else [filepath]
        self._loaders = [PyPDFLoader(file) for file in self._filepath]
    
    def lazy_load (self)->Iterator[Document]:
        for loader in self._loaders:
            for doc in loader.load():
                yield doc


In [6]:
pdf_loader = DocumentPDFLoader(file_path)

In [7]:
chunker = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len
)


In [8]:
documents = pdf_loader.load()

In [9]:
text_chunks = chunker.split_documents(documents)

In [10]:
text_chunks[40].metadata

{'producer': 'Skia/PDF m109 Google Docs Renderer',
 'creator': 'PyPDF',
 'creationdate': '',
 'title': 'ielts-listening-practice-test-pdf-1',
 'source': 'data/ielts_listening_practice_test_pdf_1_1_1ae068b05d.pdf',
 'total_pages': 7,
 'page': 3,
 'page_label': '4'}

In [11]:
docs = [doc.page_content for doc in text_chunks]
metadata = [doc.metadata for doc in text_chunks]

In [12]:
load_dotenv() # Load environment variables if needed (e.g., API keys for other tools)
TEST_COLLECTION_NAME = "agent_test_docs"
EMBEDDING_MODEL = "all-MiniLM-L6-v2" # Ensure this model is available locally
PERSIST_DIR = "./_agent_test_chroma_db"
LLM_MODEL = "deepseek-r1:1.5b" # Or your preferred Ollama model
# llm = ChatOllama(model='deepseek-r1:1.5b', temperature=0.2, max_tokens=2000)


In [13]:
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


In [14]:
docs[0]

'Joint Automatic Speech Recognition And Structure\nLearning For Better Speech Understanding\nJiliang Hu1, Zuchao Li 2,*, Mengjia Shen 3, Haojun Ai 1, Sheng Li 4, Jun Zhang 3\n1Key Laboratory of Aerospace Information Security and Trusted Computing, Ministry of Education,\nSchool of Cyber Science and Engineering, Wuhan University, Wuhan, China,\n2School of Computer Science, Wuhan University, Wuhan, China,\n3Wuhan Second Ship Design and Research Institute, Wuhan, China,\n4National Institute of Information and Communications Technology, Japan.\nAbstract—Spoken language understanding (SLU) is a structure\nprediction task in the field of speech. Recently, many works\non SLU that treat it as a sequence-to-sequence task have\nachieved great success. However, This method is not suitable\nfor simultaneous speech recognition and understanding. In this\npaper, we propose a joint speech recognition and structure\nlearning framework (JSRSL), an end-to-end SLU model based'

In [15]:
def setup_environment():
    """Initializes LLM, Tools, and cleans up old DB."""
    logger.info("--- Setting up test environment ---")

    try:
        # Initialize LLM
        logger.info(f"Loading LLM: {LLM_MODEL}")
        llm = ChatOllama(model=LLM_MODEL, temperature=0.1)
        # Simple check if LLM is accessible (optional, Ollama might not have a direct check)
        # llm.invoke("Hi")
        logger.info("LLM loaded successfully.")

        # Initialize Tools
        logger.info("Initializing RAG Tools...")
        retriever_tool = RetrieveTool(
            embedding_model_name=EMBEDDING_MODEL,
            persist_dir=PERSIST_DIR
        )
        ingest_tool = IngestTool(retriever_tool=retriever_tool)
        logger.info("RAG Tools initialized successfully.")

        return llm, retriever_tool, ingest_tool

    except Exception as e:
        logger.error(f"Failed to set up environment: {e}", exc_info=True)
        raise


In [16]:
llm, retriever_tool, ingest_tool = setup_environment()

  llm = ChatOllama(model=LLM_MODEL, temperature=0.1)
  logger.warn(


In [17]:
tools = [retriever_tool, ingest_tool]

In [18]:
logger.info("--- Defining Knowledge Base Manager Agent ---")
kb_manager_agent = Agent(
    role='Knowledge Base Manager',
    goal=f"Efficiently manage and retrieve information from the company's knowledge base stored in ChromaDB. Use the provided tools to ingest new documents into specific collections and retrieve relevant information based on queries.",
    backstory=(
        "You are an expert AI assistant responsible for maintaining the accuracy and accessibility "
        "of the company's document knowledge base. You meticulously ingest new information using the "
        "'ChromaDB Document Ingest Tool' and expertly query the database using the "
        "'ChromaDB Retriever Tool' to answer questions. Always specify the correct collection name."
    ),
    llm=llm,
    tools=tools,
    verbose=True, # Set to True to see LLM reasoning and tool calls
    allow_delegation=False,
    
    # memory=True # Optional: Enable memory for conversation context if needed
)
logger.info(f"Agent '{kb_manager_agent.role}' created.")

2025-05-04 23:59:06,407 - 128333791036480 - llm.py-llm:187 - ERROR: Failed to get supported params: argument of type 'NoneType' is not iterable



[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m



In [19]:
docs_repr = repr(docs)
metas_repr = repr(metadata)

In [20]:
task_ingest = Task(
        description=(
            f"Ingest the following set of documents into the '{TEST_COLLECTION_NAME}' collection "
            f"using the 'ChromaDB Document Ingest Tool'. Ensure you pass both the document texts "
            f"and their corresponding metadata.\n\n"
            f"Documents to ingest: {docs_repr}\n"
            f"Associated Metadatas: {metas_repr}"
        ),
        expected_output=(
            f"Confirmation that {len(docs)} documents were successfully ingested "
            f"into the '{TEST_COLLECTION_NAME}' collection."
        ),
        agent=kb_manager_agent,
        tools=[ingest_tool] # Optional: Limit tools for this specific task
    )
logger.info("Ingestion task defined.")

In [21]:
query = "What is the Joint Automatic Speech Recognition and Machine Translation (JASR-MT) task?"
task_retrieve = Task(
    description=(
        f"Search the '{TEST_COLLECTION_NAME}' collection using the 'ChromaDB Retriever Tool' to find information relevant to the following query: '{query}'. "
        f"Retrieve the top 3 most relevant documents using MMR for diversity. " # Explicitly guide MMR usage
        f"Present the content of the retrieved documents clearly."
    ),
    expected_output=(
        "A summary or list of the content from the top 3 relevant documents found in the "
        f"'{TEST_COLLECTION_NAME}' collection related to '{query}', retrieved using MMR."
    ),
    agent=kb_manager_agent,
    context=[task_ingest], # Make this task depend on the ingestion task
    tools=[retriever_tool] # Optional: Limit tools for this specific task
)
logger.info("Retrieval task defined.")

In [22]:
reflection_agent = Agent(
    role='Reflection Agent',
    goal="Reflect on the tasks and provide insights.",
    backstory=(
        "You are an AI assistant designed to reflect on the tasks performed by the Knowledge Base Manager. "
        "Your role is to analyze the ingestion and retrieval processes, ensuring they align with the company's goals."
    ),
    llm=llm,
    tools=[],
    verbose=True, # Set to True to see LLM reasoning and tool calls
    allow_delegation=False,
)
logger.info("Reflection agent created.")
reflection_task = Task(
    description=(
        "Reflect on the tasks performed by the Knowledge Base Manager. "
        "Analyze the ingestion and retrieval processes, ensuring they align with the company's goals."
    ),
    expected_output=(
        "Insights and analysis of the ingestion and retrieval processes, "
        "ensuring they align with the company's goals."
    ),
    agent=reflection_agent,
    context=[task_ingest, task_retrieve], # Make this task depend on both ingestion and retrieval tasks
    tools=[] # No tools needed for reflection
)
logger.info("Reflection task defined.")


2025-05-04 23:59:15,119 - 128333791036480 - llm.py-llm:187 - ERROR: Failed to get supported params: argument of type 'NoneType' is not iterable



[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m



In [24]:
# 5. Create and Run Crew
logger.info("--- Creating and Running the Crew ---")
company_knowledge_crew = Crew(
    agents=[kb_manager_agent],
    tasks=[task_ingest, task_retrieve],
    process=Process.sequential, # Ensure tasks run in order: ingest then retrieve
    verbose=True # Use verbose=2 to see detailed LLM thoughts and tool calls
)

# result = company_knowledge_crew.kickoff()

# logger.info("--- Crew Execution Finished ---")
# print("\n\n===== Final Crew Result =====")
# print(result)
# print("============================")

2025-05-04 23:59:33,036 - 128333791036480 - llm.py-llm:187 - ERROR: Failed to get supported params: argument of type 'NoneType' is not iterable



[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m



In [25]:
# print("Result: \n", result)

In [None]:
# import os
# from crewai import Agent, Task, Crew, Process
# from crewai_tools import BaseTool, SerperDevTool
# from langchain_community.chat_models.ollama import ChatOllama
# from agent.config.load_config import agents_config
# from agent.tools.nl2sql_tool import NL2SQLTool, ValidateSQLQueryTool
# from dotenv import load_dotenv
# from typing import List, Optional, Dict, Any
# import chromadb
# from os.path import dirname, join, abspath

# # Import the retrieval tools
# from agent.tools.retrieve_tool import RetrieveTool, IngestTool

# # Load environment variables
# env_path = join(dirname(dirname(abspath(__file__))), '.env')
# load_dotenv(env_path)

# class MultiAgentSystem:
#     """
#     A multi-agent system using CrewAI framework.
#     Integrates company, customer service, HR, and recommender agents.
#     """
    
#     def __init__(self, model_name: str = "deepseek-r1:1.5b"):
#         """
#         Initialize the multi-agent system with specified LLM model.
        
#         Args:
#             model_name: Name of the LLM model to use across agents
#         """
#         self.model_name = model_name
#         self.llm = self._load_llm(model_name)
        
#         # Initialize tools
#         self.tools = self._setup_tools()
        
#         # Initialize agents
#         self.agents = self._setup_agents()
        
#         # Create crew
#         self.crew = self._setup_crew()
    
#     def _load_llm(self, model_name: str):
#         """Load the language model."""
#         return ChatOllama(model=model_name, temperature=0.2, max_tokens=2000)
    
#     def _setup_tools(self) -> Dict[str, List[BaseTool]]:
#         """Set up tools for each agent."""
#         # Common tools
#         serper_api_key = os.environ.get('SERPER_API_KEY')
#         search_tool = SerperDevTool(api_key=serper_api_key) if serper_api_key else None
#         nl2sql_tool = NL2SQLTool()
#         validate_sql_tool = ValidateSQLQueryTool()
        
#         # Retrieval tools
#         retriever_tool = RetrieveTool(embedding_model_name="all-MiniLM-L6-v2")
#         ingest_tool = IngestTool(retriever_tool=retriever_tool)
        
#         # Define tool sets for each agent
#         return {
#             "company_agent": [tool for tool in [search_tool, nl2sql_tool, validate_sql_tool, retriever_tool, ingest_tool] if tool],
#             "customer_service_agent": [tool for tool in [search_tool, nl2sql_tool, validate_sql_tool, retriever_tool] if tool],
#             "hr_agent": [tool for tool in [search_tool, nl2sql_tool, retriever_tool] if tool],
#             "naive_agent": [tool for tool in [search_tool, retriever_tool] if tool],
#             "recommender_agent": [tool for tool in [search_tool, retriever_tool] if tool]
#         }
    
#     def _setup_agents(self) -> Dict[str, Agent]:
#         """Set up all agents with their tools and configurations."""
#         agents = {}
        
#         # Create each agent using their config
#         for agent_type in ["company_agent", "customer_service_agent", "hr_agent", "naive_agent", "recommender_agent"]:
#             config = agents_config.get(agent_type, {})
#             agents[agent_type] = Agent(
#                 role=config.get('role', f"{agent_type.replace('_', ' ').title()}"),
#                 goal=config.get('goal', "Help the company achieve its objectives"),
#                 backstory=config.get('backstory', "An experienced professional in the field"),
#                 verbose=config.get('verbose', True),
#                 allow_delegation=config.get('allow_delegation', False),
#                 tools=self.tools.get(agent_type, []),
#                 llm=self.llm
#             )
        
#         return agents
    
#     def _setup_crew(self) -> Crew:
#         """Set up the crew with all agents and their tasks."""
#         # Define tasks for each agent
#         company_task = Task(
#             description="Analyze company data and make strategic decisions",
#             agent=self.agents["company_agent"],
#             expected_output="Strategic analysis and recommendations for the company"
#         )
        
#         customer_task = Task(
#             description="Address customer inquiries and improve satisfaction",
#             agent=self.agents["customer_service_agent"],
#             expected_output="Customer service report and satisfaction improvement plan"
#         )
        
#         hr_task = Task(
#             description="Manage employee relations and recruitment",
#             agent=self.agents["hr_agent"],
#             expected_output="HR management report and recruitment strategy"
#         )
        
#         recommender_task = Task(
#             description="Provide personalized recommendations to customers",
#             agent=self.agents["recommender_agent"],
#             expected_output="Customer recommendation system plan and implementation strategy"
#         )
        
#         # Create crew with all agents and tasks
#         return Crew(
#             agents=list(self.agents.values()),
#             tasks=[company_task, customer_task, hr_task, recommender_task],
#             verbose=2,
#             process=Process.sequential  # Can be changed to Process.hierarchical if needed
#         )
    
#     def run(self, query: str) -> str:
#         """
#         Run the multi-agent system with a specific query.
        
#         Args:
#             query: User query to process across agents
            
#         Returns:
#             str: Results from the crew's execution
#         """
#         # You could customize the tasks or crew configuration based on the query
#         result = self.crew.kickoff(inputs={"query": query})
#         return result

# # Example usage
# if __name__ == "__main__":
#     # Initialize the multi-agent system
#     system = MultiAgentSystem(model_name="deepseek-r1:1.5b")
    
#     # Example document ingestion for testing the retrieval tool
#     documents = [
#         "Our company specializes in AI-powered solutions for businesses.",
#         "Customer satisfaction is our top priority, with 24/7 support available.",
#         "The HR department handles recruitment, onboarding, and employee relations.",
#         "Our recommendation engine uses machine learning to suggest products.",
#         "Company policies include flexible working hours and remote options."
#     ]
    
#     metadatas = [
#         {"category": "company_info", "department": "general"},
#         {"category": "customer_service", "department": "support"},
#         {"category": "hr", "department": "human_resources"},
#         {"category": "technology", "department": "engineering"},
#         {"category": "policy", "department": "human_resources"}
#     ]
    
#     # Access company agent and ingest sample documents
#     retriever_tool = next((t for t in system.tools["company_agent"] if isinstance(t, RetrieveTool)), None)
#     if retriever_tool:
#         retriever_tool.ingest_documents(
#             documents=documents,
#             collection_name="company_docs",
#             metadatas=metadatas
#         )
    
#     # Run system with a test query
#     result = system.run("What are our company's HR policies and how can we improve employee satisfaction?")
#     print(result)

In [26]:
from langchain_community.embeddings.ollama import OllamaEmbeddings

In [27]:
os.environ["SERPER_API_KEY"] = "a6ce4b5fc4ef3f9754144d519ecf9e418bc1c7bc"

In [28]:
import os
import json
from dotenv import load_dotenv
from typing import TypedDict, Annotated, List, Dict, Any, Optional
import operator

# from langchain_openai import ChatOpenAI
from langchain_community.chat_models.ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain.tools import Tool
# from crewai_tools import DuckDuckGoSearchRun # Re-use crewai tool directly
from crewai_tools import SerperDevTool
# For Customer Retriever (Example using FAISS)
# from langchain_community.vectorstores import FAISS
from langchain_community.vectorstores.chroma import Chroma
# from langchain_openai import OpenAIEmbeddings
from langchain.docstore.document import Document
from langchain_community.tools.vectorstore.tool import VectorStoreQATool

# from langgraph.graph import StateGraph, END
# from langgraph.checkpoint.sqlite import SqliteSaver # For state persistence/debugging if needed

# Load environment variables
load_dotenv()

# --- Initialize LLM ---
# Using a more capable model might be beneficial for classification and reflection
# llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
llm = ChatOllama(model="deepseek-r1:1.5b", temperature=0.2, num_predict=200)

# --- Initialize Tools ---
search_tool = SerperDevTool(n_results=2)

# --- Mock Customer Database & Retriever Tool ---
# In a real scenario, connect to your actual database/vector store
customer_data = {
    "cust123": [
        Document(page_content="Customer John Doe. Premium plan member since 2022. Last interaction: support ticket #5678 resolved.", metadata={"customer_id": "cust123", "source": "crm"}),
        Document(page_content="John Doe purchased Product X in Jan 2023 and Product Y in Nov 2023.", metadata={"customer_id": "cust123", "source": "orders"}),
    ],
    "cust456": [
        Document(page_content="Customer Jane Smith. Basic plan member since 2023. No open support tickets.", metadata={"customer_id": "cust456", "source": "crm"}),
    ]
}
# Flatten documents for indexing
all_docs = [doc for docs in customer_data.values() for doc in docs]

if all_docs:
    # embeddings = OpenAIEmbeddings()
    embeddings = OllamaEmbeddings(model="hf.co/CompendiumLabs/bge-base-en-v1.5-gguf:latest")
    vector_store = Chroma.from_documents(documents=all_docs, embedding=embeddings, persist_directory=PERSIST_DIR, collection_name=TEST_COLLECTION_NAME)
    retriever = vector_store.as_retriever(search_kwargs={"k": 2}) # Retrieve top 2 docs

    customer_retriever_tool = VectorStoreQATool(
        name="customer_info_retriever",
        description="Searches and returns information about a specific customer from the customer database based on their ID.",
        vectorstore=vector_store,
        llm=llm # The tool itself can use an LLM for QA over retrieved docs
    )
    # Note: This tool needs the query to implicitly or explicitly contain the customer ID for filtering,
    # or the retriever needs more sophisticated filtering setup.
    # For simplicity here, we'll assume the query passed to the tool includes context like "for customer cust123".
else:
    # Create a dummy tool if no customer data exists
    def dummy_retriever_func(query: str):
        return "No customer data available for retrieval."
    customer_retriever_tool = Tool(
        name="customer_info_retriever",
        description="Searches and returns information about a specific customer. Currently, no data is loaded.",
        func=dummy_retriever_func
    )

print("Tools Initialized.")
# print(f"Customer Retriever Tool Ready: {customer_retriever_tool.name}")


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  exec(code_obj, self.user_global_ns, self.user_ns)
  embeddings = OllamaEmbeddings(model="hf.co/CompendiumLabs/bge-base-en-v1.5-gguf:latest")


Tools Initialized.


/home/ctai-datpd-l/anaconda3/envs/ai_agents/lib/python3.11/site-packages/chromadb/types.py:144: PydanticDeprecatedSince211: Accessing the 'model_fields' attribute on the instance is deprecated. Instead, you should access this attribute from the model class. Deprecated in Pydantic V2.11 to be removed in V3.0.
  return self.model_fields  # pydantic 2.x


In [30]:
from langchain_community.tools.vectorstore.tool import VectorStoreQATool


In [29]:
from crewai import Agent as CrewAgent # Alias to avoid confusion with node names

# Agent 1: Company Agent (Definition)
company_agent_def = CrewAgent(
    role='Company Information Specialist',
    goal='Provide accurate information about the company (e.g., Google) including location, products, services, history, and mission, based on search results and internal knowledge.',
    backstory='You are an AI assistant dedicated to representing the company accurately and helpfully to external queries. You use search tools to find the latest public information.',
    tools=[search_tool],
    llm=llm,
    verbose=False
)

# Agent 2: Customer Agent (Definition)
customer_agent_def = CrewAgent(
    role='Customer Support Agent',
    goal='Answer customer-specific questions by retrieving their information from the customer database using their customer ID. Handle queries about plans, purchase history, support tickets etc.',
    backstory='You are a helpful customer support agent with access to the customer database. You must use the customer ID provided to retrieve relevant information using your specialized tool.',
    tools=[search_tool, customer_retriever_tool], # Has retriever and search
    llm=llm,
    verbose=False
)

# Agent 3: Naive Agent (Definition)
naive_agent_def = CrewAgent(
    role='General Knowledge Assistant',
    goal='Answer general knowledge questions or queries that do not fall under specific company or customer information categories. Use search tools to find relevant information online.',
    backstory='You are a general-purpose AI assistant capable of answering a wide range of topics by searching the internet.',
    tools=[search_tool],
    llm=llm,
    verbose=False
)

print("CrewAI Agent Definitions Ready (for conceptual reference).")

2025-05-05 00:00:06,199 - 128333791036480 - llm.py-llm:187 - ERROR: Failed to get supported params: argument of type 'NoneType' is not iterable



[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m



ValidationError: 1 validation error for Agent
tools
  Value error, Invalid tool type: <class 'langchain_community.tools.vectorstore.tool.VectorStoreQATool'>. Tool must be an instance of BaseTool or an object with 'name', 'func', and 'description' attributes. [type=value_error, input_value=[SerperDevTool(name='Sear...=200, temperature=0.2))], input_type=list]
    For further information visit https://errors.pydantic.dev/2.11/v/value_error

In [None]:
from crewai import Task, Crew, Process
from typing import List, Dict, Any, Optional, Literal

In [None]:
class AgentState(TypedDict):
    original_query: str
    customer_id: Optional[str] # Allow optional customer ID
    rewritten_query: str
    classified_agent: Literal["Company", "Customer", "Naive", "Unknown"] # Agent types
    agent_response: str
    reflection: str
    is_final: bool
    error: Optional[str]
    retry_count: int

print("LangGraph State Defined.")

LangGraph State Defined.


In [None]:
from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate
from typing import Literal
# --- Node 1: Rewriter ---
class RewriterOutput(BaseModel):
    """Structured output for the Rewriter node."""
    rewritten_query: str = Field(description="The user's query, corrected for spelling and clarity.")
    agent_classification: Literal["Company", "Customer", "Naive", "Unknown"] = Field(description="The best agent type to handle the rewritten query.")
    extracted_customer_id: Optional[str] = Field(description="Customer ID extracted from the query, if relevant and present (e.g., 'cust123'). Null otherwise.")

def rewrite_query_node(state: AgentState):
    """Rewrites the user query and classifies which agent should handle it."""
    print("--- Node: Rewriter ---")
    query = state['original_query']

    system_prompt = """You are an expert query processor. Your tasks are:
1.  Correct any spelling mistakes in the user query.
2.  Rephrase the query for maximum clarity if necessary.
3.  Classify the query's intent to determine the best agent to handle it:
    - 'Company': For questions about a specific company's details (location, products, services, mission etc.). Assume the company is 'Google' if not specified.
    - 'Customer': For questions related to a specific customer's account, history, plan, support tickets etc. These queries MUST contain or imply a customer ID.
    - 'Naive': For general knowledge questions, queries unrelated to the company or a specific customer.
    - 'Unknown': If the query is ambiguous or cannot be clearly classified.
4. Extract any customer ID mentioned (like cust123, user45, id: 7890). Return null if no ID is found.
Provide your output in the specified JSON format."""

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", "Process the following user query: {query}")
    ])

    # Use OpenAI Functions for structured output
    rewriter_chain = prompt | llm.bind_functions(functions=[RewriterOutput], function_call="RewriterOutput") | JsonOutputFunctionsParser()

    try:
        response: RewriterOutput = rewriter_chain.invoke({"query": query})
        print(f"Rewriter Output: {response}")
        return {
            "rewritten_query": response['rewritten_query'],
            "classified_agent": response['agent_classification'],
            "customer_id": response.get('extracted_customer_id') or state.get('customer_id'), # Prioritize extracted, fallback to state
             "error": None
        }
    except Exception as e:
        print(f"Error in Rewriter node: {e}")
        return {"error": f"Failed to rewrite/classify query: {e}"}

# --- Nodes 2, 3, 4: Specialized Agent Execution ---
# These nodes will simulate the execution of the conceptually defined CrewAI agents.

def execute_agent_node(state: AgentState, agent_def: CrewAgent, agent_name: str):
    """Generic function to execute the logic of a specific agent."""
    print(f"--- Node: Execute {agent_name} Agent ---")
    if state.get("error"): return {} # Don't run if prior error

    query = state['rewritten_query']
    customer_id = state.get('customer_id') # Relevant for Customer agent

    # Construct a prompt using the agent's definition
    prompt_messages = [
        SystemMessage(content=f"Role: {agent_def.role}\nGoal: {agent_def.goal}\nBackstory: {agent_def.backstory}"),
        HumanMessage(content=f"User Query: {query}")
    ]
    if agent_name == "Customer" and customer_id:
         prompt_messages.append(HumanMessage(content=f"Context: Apply this query to customer ID: {customer_id}."))
    elif agent_name == "Customer" and not customer_id:
         return {"agent_response": "Cannot answer customer question without a Customer ID.", "error": "Missing Customer ID"}


    # Simplified tool handling for LangGraph node
    # A more robust implementation would use LangChain's agent executors or tool calling
    available_tools = {tool.name: tool for tool in agent_def.tools}
    tool_response = ""

    # Basic tool check (can be expanded)
    # This is a placeholder - real tool use requires more complex agent logic (like ReAct or OpenAI Functions Agent)
    if "duckduckgo_search" in available_tools and agent_name != "Customer": # Example: Use search for non-customer
        try:
            tool_response = f"\nSearch Results: {search_tool.run(query)}"
            prompt_messages.append(AIMessage(content=f"Tool Used: duckduckgo_search\nResult: {tool_response[:500]}...")) # Add tool result snippet
        except Exception as e:
            print(f"Error using search tool: {e}")
            tool_response = "\nSearch tool failed."

    if "customer_info_retriever" in available_tools and agent_name == "Customer":
        try:
            # Pass query potentially enriched with customer ID context
            retriever_query = f"Info for customer {customer_id}: {query}"
            tool_response = f"\nCustomer DB Info: {customer_retriever_tool.run(retriever_query)}"
            prompt_messages.append(AIMessage(content=f"Tool Used: customer_info_retriever\nResult: {tool_response[:500]}..."))
        except Exception as e:
            print(f"Error using customer retriever tool: {e}")
            tool_response = "\nCustomer retrieval tool failed."

    # Final LLM call to generate response based on persona and potential tool output
    final_prompt = ChatPromptTemplate.from_messages(prompt_messages)
    chain = final_prompt | llm
    try:
        response = chain.invoke({}) # Query is already in messages
        print(f"{agent_name} Agent Response Snippet: {response.content[:200]}...")
        return {"agent_response": response.content, "error": None}
    except Exception as e:
        print(f"Error during {agent_name} agent LLM call: {e}")
        return {"error": f"{agent_name} agent failed during generation: {e}"}


# Specific node functions calling the generic executor
def company_agent_node(state: AgentState):
    return execute_agent_node(state, company_agent_def, "Company")

def customer_agent_node(state: AgentState):
    return execute_agent_node(state, customer_agent_def, "Customer")

def naive_agent_node(state: AgentState):
    return execute_agent_node(state, naive_agent_def, "Naive")

# --- Node 5: Reflection ---
class ReflectionOutput(BaseModel):
    """Structured output for the Reflection node."""
    feedback: str = Field(description="Constructive feedback on the response's relevance and correctness relative to the original query.")
    is_final_answer: bool = Field(description="True if the answer is satisfactory and directly addresses the original query, False otherwise.")

def reflection_node(state: AgentState):
    """Reflects on the generated answer, checking relevance and correctness."""
    print("--- Node: Reflection ---")
    if state.get("error"): return {"is_final": True} # If error occurred, end the loop

    original_query = state['original_query']
    agent_response = state['agent_response']
    # rewritten_query = state['rewritten_query'] # Could also be used for context

    if not agent_response:
         print("No agent response to reflect on.")
         return {"reflection": "No response generated.", "is_final": True, "error": "Reflection failed: No agent response found."}


    system_prompt = """You are a meticulous quality assurance reviewer. Your task is to evaluate an AI agent's response based on the user's original query.
                    Assess the following:
                    1.  **Relevance:** Does the response directly address the user's original question?
                    2.  **Correctness:** Is the information likely correct (based on general knowledge or provided context)? You don't need to verify external facts exhaustively, but check for obvious flaws or contradictions.
                    3.  **Completeness:** Does the response sufficiently answer the question?

                    Provide constructive feedback and determine if the answer is final (good enough) or needs revision/retry. Use the specified JSON format."""

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", "Original Query: {original_query}\n\nAgent Response:\n{agent_response}\n\nPlease evaluate and provide feedback.")
    ])

    reflection_chain = prompt | llm.bind_functions(functions=[ReflectionOutput], function_call="ReflectionOutput") | JsonOutputFunctionsParser()

    try:
        response: ReflectionOutput = reflection_chain.invoke({
            "original_query": original_query,
            "agent_response": agent_response
        })
        print(f"Reflection Output: {response}")
        return {
            "reflection": response['feedback'],
            "is_final": response['is_final_answer'],
            "retry_count": state.get('retry_count', 0) + 1, # Increment retry count
            "error": None
        }
    except Exception as e:
        print(f"Error in Reflection node: {e}")
        # Decide how to handle reflection error - maybe treat as non-final?
        return {"error": f"Reflection failed: {e}", "is_final": False, "retry_count": state.get('retry_count', 0) + 1}

print("LangGraph Nodes Defined.")

LangGraph Nodes Defined.


In [None]:
from langgraph.graph import StateGraph, END

ImportError: cannot import name 'CONFIG_KEYS' from 'langchain_core.runnables.config' (/home/ctai-datpd-l/anaconda3/envs/ai_agents/lib/python3.11/site-packages/langchain_core/runnables/config.py)

In [None]:
# --- Conditional Edges ---

def route_query(state: AgentState):
    """Routes to the appropriate agent node based on classification."""
    print(f"--- Conditional Edge: Routing Query ---")
    if state.get("error"):
        print("Error detected before routing.")
        return "error_handler" # Or END directly

    classification = state['classified_agent']
    print(f"Routing based on classification: {classification}")
    if classification == "Company":
        return "execute_company"
    elif classification == "Customer":
        # Add check for customer ID existence
        if state.get("customer_id"):
             return "execute_customer"
        else:
             print("Customer classification but no ID found.")
             # Update state to reflect missing ID issue and go to reflection/end
             state["error"] = "Query classified as Customer, but no Customer ID was provided or found."
             state["agent_response"] = "I need a customer ID to answer that question."
             return "reflect" # Go to reflection to potentially end gracefully
    elif classification == "Naive":
        return "execute_naive"
    else: # Unknown or error during classification
        print("Classification is Unknown or invalid.")
        state["error"] = f"Could not determine the right agent for the query (classification: {classification})."
        state["agent_response"] = "I'm not sure how to handle that query. Could you please rephrase?"
        return "reflect" # Go to reflection

def decide_after_reflection(state: AgentState):
    """Decides whether to end the process or retry based on reflection."""
    print(f"--- Conditional Edge: After Reflection ---")
    is_final = state['is_final']
    retry_count = state['retry_count']
    max_retries = 2 # Set a limit for retries

    if state.get("error") and "Reflection failed" not in state["error"]: # Handle agent errors first
        print(f"Ending due to execution error: {state['error']}")
        return END # End directly on execution error

    print(f"Reflection result: is_final={is_final}, Retry count={retry_count}")

    if is_final:
        print("Reflection approved. Ending.")
        return END
    elif retry_count >= max_retries:
        print(f"Max retries ({max_retries}) reached. Ending.")
        # Optionally, provide the last reflection feedback
        state["agent_response"] += f"\n\n[System Note: Max retries reached. Last feedback: {state.get('reflection', 'N/A')}]"
        return END
    else:
        print("Reflection suggests retry. Looping back to Rewriter.")
        # Add reflection feedback to the original query for context in the next loop? Optional.
        # state['original_query'] = f"{state['original_query']}\n\n[Retry Context: Previous attempt failed. Feedback: {state.get('reflection', 'N/A')}]"
        return "rewrite_query" # Loop back to the start

# --- Build the Graph ---
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("rewrite_query", rewrite_query_node)
workflow.add_node("execute_company", company_agent_node)
workflow.add_node("execute_customer", customer_agent_node)
workflow.add_node("execute_naive", naive_agent_node)
workflow.add_node("reflect", reflection_node)
# Optional: Add a specific error handling node if needed
# workflow.add_node("error_handler", ...)

# Define edges
workflow.set_entry_point("rewrite_query")

# Routing from Rewriter
workflow.add_conditional_edges(
    "rewrite_query",
    route_query,
    {
        "execute_company": "execute_company",
        "execute_customer": "execute_customer",
        "execute_naive": "execute_naive",
        "reflect": "reflect", # Handle Unknown/Error cases by going directly to reflection
        # "error_handler": "error_handler" # Route explicit errors
    }
)

# Edges from execution nodes to reflection
workflow.add_edge("execute_company", "reflect")
workflow.add_edge("execute_customer", "reflect")
workflow.add_edge("execute_naive", "reflect")

# Conditional edge from Reflection (Loop or End)
workflow.add_conditional_edges(
    "reflect",
    decide_after_reflection,
    {
        "rewrite_query": "rewrite_query", # Loop back
        END: END
    }
)

# Compile the graph
# memory = SqliteSaver.from_conn_string(":memory:") # Optional: In-memory checkpointing
# app = workflow.compile(checkpointer=memory)
app = workflow.compile()


print("LangGraph Compiled.")

NameError: name 'StateGraph' is not defined

In [None]:
def run_agentic_system(query: str, cust_id: Optional[str] = None):
    """Runs the agentic system with a user query."""
    initial_state = AgentState(
        original_query=query,
        customer_id=cust_id,
        rewritten_query="",
        classified_agent="Unknown", # Start as Unknown
        agent_response="",
        reflection="",
        is_final=False,
        error=None,
        retry_count=0
    )

    print(f"\n🚀 Starting Agentic System for Query: '{query}'" + (f" (Customer ID: {cust_id})" if cust_id else ""))
    # config = {"configurable": {"thread_id": f"thread_{query[:10]}"}} # Example thread ID if using checkpointer

    try:
        # final_state = app.invoke(initial_state, config=config)
        final_state = app.invoke(initial_state, {"recursion_limit": 10})


        print("\n🏁 Agentic System Finished!")
        print("------ Final State ------")
        # Pretty print relevant parts of the final state
        print(f"Original Query: {final_state['original_query']}")
        if final_state.get('rewritten_query'): print(f"Rewritten Query: {final_state['rewritten_query']}")
        if final_state.get('classified_agent'): print(f"Agent Used: {final_state['classified_agent']}")
        if final_state.get('error'):
            print(f"Error Occurred: {final_state['error']}")
        print(f"\nFinal Response:\n{final_state['agent_response']}")
        if final_state.get('reflection') and not final_state.get('is_final') and final_state.get('retry_count') > 0:
             print(f"\nLast Reflection Feedback (Process Ended): {final_state['reflection']}")

        return final_state

    except Exception as e:
        print(f"\n💥 An unhandled error occurred during graph execution: {e}")
        import traceback
        traceback.print_exc()
        return None

# --- Example Queries ---
run_agentic_system("Where is the main Google office located?")
run_agentic_system("Tell me about my purchase history for cust123")
run_agentic_system("What was my last support ticket about?", cust_id="cust123") # Providing ID separately
run_agentic_system("can you tell me about the plan for customer cust456 please?")
run_agentic_system("Who invented the telephone?")
run_agentic_system("hwat is googles mission sttement?") # Test spelling correction
run_agentic_system("Tell me about cust999") # Test non-existent customer ID (if retriever handles it)
run_agentic_system("This query is confusing and makes no sense.") # Test unknown/retry


🚀 Starting Agentic System for Query: 'Where is the main Google office located?'

💥 An unhandled error occurred during graph execution: name 'app' is not defined

🚀 Starting Agentic System for Query: 'Tell me about my purchase history for cust123'

💥 An unhandled error occurred during graph execution: name 'app' is not defined

🚀 Starting Agentic System for Query: 'What was my last support ticket about?' (Customer ID: cust123)

💥 An unhandled error occurred during graph execution: name 'app' is not defined

🚀 Starting Agentic System for Query: 'can you tell me about the plan for customer cust456 please?'

💥 An unhandled error occurred during graph execution: name 'app' is not defined

🚀 Starting Agentic System for Query: 'Who invented the telephone?'

💥 An unhandled error occurred during graph execution: name 'app' is not defined

🚀 Starting Agentic System for Query: 'hwat is googles mission sttement?'

💥 An unhandled error occurred during graph execution: name 'app' is not defined

🚀 

Traceback (most recent call last):
  File "/tmp/ipykernel_975034/2447842949.py", line 20, in run_agentic_system
    final_state = app.invoke(initial_state, {"recursion_limit": 10})
                  ^^^
NameError: name 'app' is not defined
Traceback (most recent call last):
  File "/tmp/ipykernel_975034/2447842949.py", line 20, in run_agentic_system
    final_state = app.invoke(initial_state, {"recursion_limit": 10})
                  ^^^
NameError: name 'app' is not defined
Traceback (most recent call last):
  File "/tmp/ipykernel_975034/2447842949.py", line 20, in run_agentic_system
    final_state = app.invoke(initial_state, {"recursion_limit": 10})
                  ^^^
NameError: name 'app' is not defined
Traceback (most recent call last):
  File "/tmp/ipykernel_975034/2447842949.py", line 20, in run_agentic_system
    final_state = app.invoke(initial_state, {"recursion_limit": 10})
                  ^^^
NameError: name 'app' is not defined
Traceback (most recent call last):
  Fil