In [None]:
# !uv add langgraph

!pip install -U langgraph
!pip install -U langchain
!pip install langchain-openai openai
!pip install langchain-deepseek

In [127]:
from typing import TypedDict, Literal

# Define the structure for email classification
class EmailClassification(TypedDict):
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str

class EmailAgentState(TypedDict):
    # Raw email data
    email_content: str
    sender_email: str
    email_id: str

    # Classification result
    classification: EmailClassification | None

    # Raw search/API results
    search_results: list[str] | None  # List of raw document chunks
    customer_history: dict | None  # Raw customer data from CRM

    # Generated content
    draft_response: str | None
    messages: list[str] | None

In [126]:
import os
from langchain.tools import tool, ToolRuntime
from langchain_openai import ChatOpenAI


# 设置 DeepSeek 的 API 密钥（LangChain-OpenAI 仍然会查找 OPENAI_API_KEY）
os.environ['DEEPSEEK_API_KEY'] = 'your_deepseek_api_key_here'

# 您可以尝试将 DEEPSEEK_API_KEY 的值赋给 OPENAI_API_KEY 环境变量
os.environ["OPENAI_API_KEY"] = os.getenv('DEEPSEEK_API_KEY') # 确保这个值是 DeepSeek 的 key

# 关键：指定 DeepSeek 的 API 基础 URL
DEEPSEEK_BASE_URL = "https://api.deepseek.com/v1"

model = ChatOpenAI(
    model="deepseek-chat", # 使用 DeepSeek 的模型名称
    openai_api_base=DEEPSEEK_BASE_URL, # 指定 DeepSeek 的 URL
    temperature=0.7
)

# os.environ["OPENAI_API_KEY"]

In [181]:
from typing import Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

def read_email(state: EmailAgentState) -> dict:
    """Extract and parse email content"""
    # In production, this would connect to your email service
    return {
        "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")]
    }

def extract_json(text: str) -> str:
    start = text.find("{")
    end = text.rfind("}")
    if start != -1 and end > start:
        return text[start:end+1]
    return text

def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]:
    classification_prompt = f"""
    Analyze this customer email and classify it. Respond ONLY with a valid JSON object with the following keys:
    - "intent": one of "question", "bug", "billing", "feature", "complex"
    - "urgency": one of "low", "medium", "high", "critical"
    - "topic": a short phrase describing the topic
    - "summary": a one-sentence summary

    Email: {state['email_content']}
    From: {state['sender_email']}
    """

    response = model.invoke(classification_prompt)
    
    # 在 classify_intent 中：
    raw_content = response.content.strip()
    json_str = extract_json(raw_content)
    try:
        classification_dict = json.loads(json_str)
        # ... 验证逻辑 ...
    except Exception as e:
        print(f"JSON parse failed. Raw: {raw_content}")
        # fallback

    try:
        classification_dict = json.loads(json_str)
        # 验证字段
        intent = classification_dict.get("intent", "complex")
        urgency = classification_dict.get("urgency", "medium")
        if intent not in ["question", "bug", "billing", "feature", "complex"]:
            intent = "complex"
        if urgency not in ["low", "medium", "high", "critical"]:
            urgency = "medium"
        classification_dict["intent"] = intent
        classification_dict["urgency"] = urgency
    except Exception as e:
        # 回退逻辑
        print(f"Failed to parse classification: {e}")
        classification_dict = {
            "intent": "complex",
            "urgency": "medium",
            "topic": "unknown",
            "summary": "Failed to classify"
        }

    # 路由逻辑
    if classification_dict["intent"] == "billing":
        goto = "human_review"
    elif classification_dict["intent"] in ["question", "feature"]:
        goto = "search_documentation"
    elif classification_dict["intent"] == "bug":
        goto = "bug_tracking"
    else:
        goto = "draft_response"
    
    return Command(
        update={"classification": classification_dict},  # ✅ 存的是 dict！
        goto=goto
    )

In [182]:
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Search knowledge base for relevant information"""

    # Build search query from classification
    classification = state.get('classification', {})
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"

    try:
        # Implement your search logic here
        # Store raw search results, not formatted text
        search_results = [
            "Reset password via Settings > Security > Change Password",
            "Password must be at least 12 characters",
            "Include uppercase, lowercase, numbers, and symbols"
        ]
    except SearchAPIError as e:
        # For recoverable search errors, store error and continue
        search_results = [f"Search temporarily unavailable: {str(e)}"]

    return Command(
        update={"search_results": search_results},  # Store raw results or error
        goto="draft_response"
    )

def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """Create or update bug tracking ticket"""

    # Create ticket in your bug tracking system
    ticket_id = "BUG-12345"  # Would be created via API

    return Command(
        update={
            "search_results": [f"Bug ticket {ticket_id} created"],
            "current_step": "bug_tracked"
        },
        goto="draft_response"
    )

In [183]:
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
    
    print("✅ Draft draft_prompt: draft_response")
        
    classification = state.get('classification', {})  # 现在是 dict

    context_sections = []
    if state.get('search_results'):
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"Relevant documentation:\n{formatted_docs}")
    if state.get('customer_history'):
        context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}")

    draft_prompt = f"""
    Draft a response to this customer email:
    {state['email_content']}

    Email intent: {classification.get('intent', 'unknown')}
    Urgency: {classification.get('urgency', 'medium')}

    {chr(10).join(context_sections)}

    Guidelines:
    - Be professional and helpful
    - Address their specific concern
    - Use the provided documentation when relevant
    """

    response = model.invoke(draft_prompt)

    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )
    goto = "human_review" if needs_review else "send_reply"

    return Command(
        update={"draft_response": response.content},
        goto=goto
    )

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
    classification = state.get('classification', {})

    human_decision = interrupt({
        "email_id": state.get('email_id', ''),
        "original_email": state.get('email_content', ''),
        "draft_response": state.get('draft_response', ''),
        "urgency": classification.get('urgency', 'medium'),
        "intent": classification.get('intent', 'unknown'),
        "action": "Please review and approve/edit this response"
    })

    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state.get('draft_response', ''))},
            goto="send_reply"
        )
    else:
        return Command(update={}, goto=END)

In [184]:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import RetryPolicy
from langgraph.graph import StateGraph, START, END

# Create the graph
workflow = StateGraph(EmailAgentState)

# Add nodes with appropriate error handling
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)

# Add retry policy for nodes that might have transient failures
workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# Add only the essential edges
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)

# Compile with checkpointer for persistence, in case run graph with Local_Server --> Please compile without checkpointer
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

In [None]:
import re
import json

# Test with an urgent billing issue
initial_state = {
    "email_content": "I was charged twice for my subscription! This is urgent!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
    "messages": []
}

# Run with a thread_id for persistence
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
print(result)
# print(type(result))


# The graph will pause at human_review
print(f"Draft ready for review: {result['draft_response'][:100]}")

# When ready, provide human input to resume
from langgraph.types import Command

human_response = Command(
    resume={
        "approved": True,
        "edited_response": "We sincerely apologize for the double charge. I've initiated an immediate refund..."
    }
)

# Resume executionclassify_intent
final_result = app.invoke(human_response, config)
print(f"Email sent successfully!")