In [1]:
# pip install langchain-core langchain-google-genai langgraph pydantic

In [2]:
f = open('keys/openai_api_key.txt')

OPENAI_API_KEY = f.read()

In [3]:
f = open("keys/gemini.txt")

GOOGLE_API_KEY = f.read()

In [4]:
f = open("keys/langsmith_api_key.txt")

LANGCHAIN_API_KEY = f.read()

In [5]:
LANGCHAIN_TRACING_V2 = 'true'
LANGCHAIN_PROJECT = 'CustomerSupportAgent-Langgraph'

## Create a Knowledge Base

### kb_data.txt content
- Technical Issue: The 'Server Timeout' error (code 504) is usually caused by heavy network traffic. Try clearing your cache and cookies.
- Technical Issue: For 'Login Failed' errors (code 401), reset your password using the 'Forgot Password' link. If that fails, contact human support.
- General Info: Our business hours are Monday-Friday, 9 AM to 5 PM EST.

## 01. Environment Setup and State Definition

In [6]:
import os
import sys
import requests
import json
from typing import TypedDict, List, Annotated
from operator import add

# LangChain and LangGraph imports
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, END

# --- 0. ENVIRONMENT SETUP & STATE DEFINITION ---

# 0.1 SECURE KEY LOADING & LLM INITIALIZATION
# This section MUST set the keys as environment variables
try:
    with open("keys/gemini.txt", "r") as f:
        # ðŸŒŸ FIX: Set as OS Environment Variable
        os.environ["GOOGLE_API_KEY"] = f.read().strip() 
    
    with open("keys/langsmith_api_key.txt", "r") as f:
        # ðŸŒŸ FIX: Set as OS Environment Variable
        os.environ["LANGCHAIN_API_KEY"] = f.read().strip()

    # ðŸŒŸ FIX: Set LangSmith vars as OS Environment Variables
    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_PROJECT"] = "CustomerSupportAgent-LangGraph" 

except FileNotFoundError as e:
    print(f"FATAL ERROR: Required key file not found: {e.filename}. Please create the file.")
    sys.exit(1)

# LLM Setup (This will now find the key in os.environ)
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.0)

# 0.2 Define the Agent State
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add]
    intent: str
    issue_resolved: bool

## 02. Real-time data tools

In [7]:
@tool
def check_billing_status(user_id: str) -> str:
    """
    Fetches real-time billing and subscription status by calling a local REST API.
    API URL: http://1227.0.0.1:8000/api/billing
    """
    API_ENDPOINT = "http://127.0.0.1:8000/api/billing"
    print(f"\n--- TOOL: Calling Real-Time Billing API at {API_ENDPOINT} ---")
    
    try:
        response = requests.get(f"{API_ENDPOINT}?user_id={user_id}")
        if response.status_code == 200:
            return f"API Status: SUCCESS. Data: {json.dumps(response.json())}"
        elif response.status_code == 404:
            return f"API Status: User Not Found. Detail: {response.json().get('detail', 'N/A')}"
        else:
            return f"API Status: ERROR {response.status_code}."
    except requests.exceptions.ConnectionError:
        return "API Connection Error: The local billing service (billing_api.py) is offline. Please run it."

@tool
def search_web_for_solution(query: str) -> str:
    """Performs a simulated real-time web search for technical articles."""
    print(f"\n--- TOOL: Performing Real-Time Web Search for: {query} ---")
    if "timeout error" in query.lower() or "504" in query.lower():
        return "Search Result: Official advisory on 'Server Timeout' (504): Issue is server-side congestion. Check status page."
    elif "login failed" in query.lower() or "401" in query.lower():
        return "Search Result: Official notice 'Login Failed (401)': Use two-factor authentication for bypass."
    else:
        return f"Search Result: Found 3 relevant articles for '{query}' from today's date."

## 03. Agent Nodes

### 03.1. Node: Triage Agent (The Router)

In [19]:
# 2.1 Node: Triage Agent (The Router) -- NOW FIXED
def triage_agent(state: AgentState) -> dict:
    """Classifies the intent of the latest message to determine the routing path."""
    print("\n--- NODE: Triage Agent (Routing) ---")
    
    # 1. ðŸŒŸ UPDATED & STRONGER PROMPT ðŸŒŸ
    system_instruction = (
    "You are the Triage Agent. Your sole task is to classify the user's request. "
    "Respond with ONLY one of the classification keywords."
    "\n\n"
    "--- EXAMPLES ---"
    "\n"
    "User Query: 'My invoice is wrong.' -> Response: billing"
    "\n"
    "User Query: 'I can't log in.' -> Response: technical"
    "\n"
    "User Query: 'What are your hours?' -> Response: general"
    "\n"
    "User Query: 'Thanks so much!' -> Response: close"
    "\n"
    "User Query: 'bye' -> Response: close"
    "\n"
    "User Query: 'I need to integrate 500 systems...' -> Response: escalate"
    )    
    # 2. Extract the User Message
    user_message = state['messages'][-1]
    
    # 3. Pass BOTH System and Human Messages
    messages_to_send = [
        SystemMessage(content=system_instruction),
        user_message
    ]
    
    # 4. Invoke the LLM
    response = llm.invoke(messages_to_send).content.strip().lower()
    
    # 5. Process and return the intent
    # We also update the valid_intents set just in case
    valid_intents = {'billing', 'technical', 'general', 'close', 'escalate'}
    intent = response if response in valid_intents else 'escalate'
    
    print(f"--- Intent Classified: {intent} ---")
    
    # Initialize 'issue_resolved'
    return {"intent": intent, "issue_resolved": False}

### 03.2. Generic Specialized Agent Logic

In [9]:
def run_specialized_agent(state: AgentState, tool_function, system_prompt: str) -> dict:
    
    # A simple way to log which agent is running
    agent_name = tool_function.name  # <-- ðŸŒŸ THE FIX IS HERE
    print(f"\n--- NODE: {agent_name} ---")
    
    # 1. Bind the specific tool to the LLM
    llm_with_tool = llm.bind_tools([tool_function])
    
    # 2. Set up the message history for this agent
    history = [
        SystemMessage(content=system_prompt),
        state['messages'][-1] # Pass the latest user query
    ]
    
    # 3. First LLM call (to decide on tool use)
    response_ai = llm_with_tool.invoke(history)
    
    # 4. Check if the LLM decided to use a tool
    if not response_ai.tool_calls:
        # The LLM didn't use the tool, just responded.
        print("--- AGENT FAILED: Did not call the required tool. ---")
        return {"messages": [response_ai], "issue_resolved": False}

    # 5. Execute the tool call
    print(f"--- AGENT: Calling tool {response_ai.tool_calls[0]['name']}... ---")
    tool_call = response_ai.tool_calls[0]
    
    # Use .invoke() on the tool function directly
    tool_output = tool_function.invoke(tool_call['args'])
    
    print(f"--- TOOL: Output received. ---")

    # 6. Create the ToolMessage with the tool's output
    tool_message = ToolMessage(content=str(tool_output), tool_call_id=tool_call['id'])
    
    # 7. Second LLM call (to generate a final answer based on tool output)
    # Append the AI's tool call and the tool's result to the history
    history.append(response_ai)
    history.append(tool_message)
    
    # Call the LLM *without* tools this time to force a text response
    final_response = llm.invoke(history) 
    
    print(f"--- AGENT SUCCESS: Generated final answer. ---")

    # 8. Set resolution status
    # If the agent successfully completed the tool loop, we consider it resolved.
    return {
        "messages": [final_response], 
        "issue_resolved": True # <-- Set to True on success
    }

### 03.3. Specific Agent Nodes

In [10]:
def technical_agent(state: AgentState) -> dict:
    return run_specialized_agent(
        state, 
        tool_function=search_web_for_solution, 
        system_prompt="You are a Technical Support Agent. You MUST use `search_web_for_solution` to find the most current solution. Provide the answer directly."
    )

def billing_agent(state: AgentState) -> dict:
    return run_specialized_agent(
        state, 
        tool_function=check_billing_status, 
        system_prompt="You are a Billing Specialist. You MUST use `check_billing_status` to fetch real-time account details. Provide a clear, actionable resolution."
    )

### 03.4. Node: Human Escalation

In [11]:
def human_handoff(state: AgentState) -> dict:
    print("\n--- NODE: Human Handoff ---")
    final_message = "I apologize, but this request requires specialized knowledge. Your ticket has been **escalated to a human specialist** for immediate follow-up."
    return {"messages": [AIMessage(content=final_message)], "issue_resolved": True}

In [15]:
def thank_you_and_end(state: AgentState) -> dict:
    """Node that provides a closing message and signals chat termination."""
    print("\n--- NODE: Thank You & End ---")
    final_message = "Thank you for using our automated support assistant! I'm glad I could help. Goodbye."
    return {
        "messages": [AIMessage(content=final_message)],
        "issue_resolved": True # Sets the flag to True for the router
    }

## 04. Conditional Routing Function & Graph Builder

### 04.1. Routing Function

In [12]:
def route_agent(state: AgentState) -> str:
    if state['issue_resolved']:
        return END
    intent = state['intent']
    if intent == 'billing':
        return 'billing_agent'
    elif intent == 'technical':
        return 'technical_agent'
    else: 
        return 'human_handoff'

# ðŸŒŸ ADD THIS NEW FUNCTION ðŸŒŸ
def route_after_specialist(state: AgentState) -> str:
    """A simpler router for after a specialist node has run."""
    if state['issue_resolved']:
        return END
    else:
        return 'human_handoff'

### 04.2. Build the Graph

In [17]:
workflow = StateGraph(AgentState)
workflow.add_node("triage_agent", triage_agent)
workflow.add_node("technical_agent", technical_agent)
workflow.add_node("billing_agent", billing_agent)
workflow.add_node("human_handoff", human_handoff)
workflow.add_node("thank_you_and_end", thank_you_and_end)
workflow.set_entry_point("triage_agent")

# 1. Triage agent uses the main router
workflow.add_conditional_edges(
    "triage_agent", 
    route_agent,  # <-- This is correct
    {'billing_agent': 'billing_agent', 'technical_agent': 'technical_agent', 'human_handoff': 'human_handoff', 'thank_you_and_end': 'thank_you_and_end', END: END}
)

# 2. ðŸŒŸ FIX: Specialist agents use the NEW, simpler router
workflow.add_conditional_edges(
    "technical_agent", 
    route_after_specialist,  # <-- Change this
    {END: END, 'human_handoff': 'human_handoff'}
)
workflow.add_conditional_edges(
    "billing_agent", 
    route_after_specialist,   # <-- Change this
    {END: END, 'human_handoff': 'human_handoff'}
)

workflow.add_edge('human_handoff', END)
workflow.add_edge('thank_you_and_end', END)
app = workflow.compile()

## 05. Test Function

In [14]:
def run_support_agent(query: str):
    print("\n" + "="*80)
    print(f"| USER QUERY: {query}")
    print("="*80)
    input_state = {"messages": [HumanMessage(content=query)]}
    result = app.invoke(input_state)
    final_message = result['messages'][-1].content
    print("\n" + "-"*80)
    print(f"| FINAL AGENT RESPONSE:")
    print(f"| {final_message}")
    print("-"*80 + "\n")

if __name__ == "__main__":
    print("\n\n*** IMPORTANT: Ensure 'python billing_api.py' is running in a separate terminal. ***\n")
    run_support_agent("I need help with my payment status, my ID is 12345. Is my invoice overdue?")
    run_support_agent("I am getting a 'Server Timeout' error. What is the latest fix?")
    run_support_agent("What is the long-term vision for your product roadmap?")



*** IMPORTANT: Ensure 'python billing_api.py' is running in a separate terminal. ***


| USER QUERY: I need help with my payment status, my ID is 12345. Is my invoice overdue?

--- NODE: Triage Agent (Routing) ---
--- Intent Classified: billing ---

--- NODE: check_billing_status ---
--- AGENT: Calling tool check_billing_status... ---

--- TOOL: Calling Real-Time Billing API at http://127.0.0.1:8000/api/billing ---
--- TOOL: Output received. ---
--- AGENT SUCCESS: Generated final answer. ---

--------------------------------------------------------------------------------
| FINAL AGENT RESPONSE:
| I apologize, but I'm unable to check your payment status or whether your invoice is overdue at this moment. The billing service is currently offline.

Please try again later, or contact support if the issue persists.
--------------------------------------------------------------------------------


| USER QUERY: I am getting a 'Server Timeout' error. What is the latest fix?

--- NODE: Triag