## 智能路由

本节通过一个客诉核查 Agent，来实践智能路由模式。

想象我们是一家电商平台，有位用户发起了客诉，我们用 Agent 核查用户投诉的内容是否属实。这个 Agent 的功能是，根据预设的客诉类型，将客诉转入对应的流程。我们希望待决策问题不要过于简单，必须是 `if else` 无法实现的，否则 Agent 将变成画蛇添足的产物。

```mermaid
graph LR
    A[客诉] --> B[智能路由]
    B --> C[假货]
    B --> D[物流]
    B --> E[搬单]

    C --> F[是否存在未提及的异常]
    D --> F
    E --> F

    F --> G[回复]
```

客诉以 json 形式传入：

```json
{
    "uid": 103,
    "route": "after_sales",
    "content": "我等了很久，没收到货",
    "time": "2025-06-01 12:03:55"
}
```

Agent 接到这条消息，立即判断属于哪种客诉类型，并交给对应的 MCP 核查。核查需要包含最终结果和数据库中的数据作为证据。在没有核查到对应问题的情况下，需要额外检查用户订单状态是否有其他异常。如果有，也应该作为上下文返回给运营同学。

### 1. 构造样本数据

第一步是喜闻乐见的编数据环节。我们要编两张 PostgreSQL 表：订单表、物流表。

下面是建表语句：

```sql
-- 创建数据库
CREATE DATABASE ecommerce_orders;

-- 创建新用户
CREATE USER admin WITH ENCRYPTED PASSWORD 'admin-password';

-- 授予用户权限
GRANT ALL PRIVILEGES ON DATABASE ecommerce_orders TO admin;

-- 切换数据库
\c ecommerce_orders

-- 订单表
CREATE TABLE orders (
    order_id INTEGER PRIMARY KEY,
    uid INTEGER NOT NULL,
    mall_id INTEGER NOT NULL,
    goods_id INTEGER NOT NULL,
    status VARCHAR(20) NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    CONSTRAINT valid_status CHECK (status IN ('ordered', 'cancelled'))
);

-- 物流表
CREATE TABLE logistics (
    order_id INTEGER PRIMARY KEY,
    status VARCHAR(20) NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    CONSTRAINT valid_status CHECK (status IN ('pending', 'in_transit', 'delivered', 'cancelled'))
);

-- 假货表
CREATE TABLE fake_goods (
    goods_id INTEGER NOT NULL,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 赋予表权限
GRANT SELECT ON orders TO admin;
GRANT SELECT ON logistics TO admin;
GRANT SELECT ON fake_goods TO admin;

-- 列出所有表
\dt
```

上面是两张无历史记录的状态表。实际业务中，历史记录一般存在 Hive 表，PostgreSQL 存当前状态就好了。

为了让 Agent 理解如何使用这两张表，我们为它添加注释。


```sql
-- 订单表注释
COMMENT ON TABLE orders IS '用户订单信息表';

-- 订单表字段注释
COMMENT ON COLUMN orders.order_id IS '唯一订单ID（主键）';
COMMENT ON COLUMN orders.uid IS '用户ID';
COMMENT ON COLUMN orders.mall_id IS '商城ID';
COMMENT ON COLUMN orders.goods_id IS '商品ID';
COMMENT ON COLUMN orders.status IS '订单状态: ordered(已下单)/cancelled(已取消)';
COMMENT ON COLUMN orders.timestamp IS '订单状态更新时间';

-- 物流表注释
COMMENT ON TABLE logistics IS '订单物流信息表';

-- 物流表字段注释
COMMENT ON COLUMN logistics.order_id IS '关联的订单ID（主键）';
COMMENT ON COLUMN logistics.status IS '物流状态: pending(待处理)/in_transit(运输中)/delivered(已送达)/cancelled(已取消)';
COMMENT ON COLUMN logistics.timestamp IS '物流状态更新时间';

-- 假货表注释
COMMENT ON TABLE fake_goods IS '假货商品记录表';

-- 假货表字段注释
COMMENT ON COLUMN fake_goods.goods_id IS '商品ID';
COMMENT ON COLUMN fake_goods.create_time IS '记录创建时间';
```

让 DeepSeek 帮我造一些订单：

```sql
-- 插入订单数据
INSERT INTO orders (order_id, uid, mall_id, goods_id, status, timestamp) VALUES
(1001, 101, 1, 5001, 'ordered', '2025-05-01 10:00:00'),  -- 正常下单待发货
(1002, 102, 2, 6002, 'ordered', '2025-05-02 14:30:00'),  -- 运输中订单
(1003, 103, 1, 5003, 'ordered', '2025-05-03 09:15:00'),  -- 已送达订单
(1004, 104, 3, 7004, 'cancelled', '2025-05-04 16:45:00'), -- 发货前取消
(1005, 105, 2, 6005, 'cancelled', '2025-05-05 11:20:00'); -- 运输中取消

-- 插入物流数据
INSERT INTO logistics (order_id, status, timestamp) VALUES
(1001, 'pending', '2025-05-01 10:05:00'),     -- 待发货状态
(1002, 'in_transit', '2025-05-02 15:00:00'),   -- 运输中状态
(1003, 'delivered', '2025-05-03 17:30:00'),    -- 已送达状态
(1004, 'cancelled', '2025-05-04 16:50:00'),    -- 发货前取消
(1005, 'in_transit', '2025-05-05 11:30:00');   -- 取消时已在运输中

-- 插入假货数据
INSERT INTO fake_goods (goods_id, create_time) VALUES
(5003, '2025-05-02 12:30:00'),
(6003, '2025-05-06 15:45:00');

```

> PostgreSQL 数据库的安装过程见 [postgresql_bot.ipynb](test_qwen_agent/3.postgresql_bot.ipynb)

### 2. 使用 Python 连接 Postgres

检查能否获取 PostgreSQL 中的数据。

In [1]:
import psycopg2

conn = psycopg2.connect(
    host="localhost",
    port="5432",
    database="ecommerce_orders",
    user="admin",
    password="admin-password"
)

也可以从 `.env` 中加载数据库配置，参考本项目仓库的 [load_env_variables.ipynb](./load_env_variables.ipynb)。

In [2]:
import os
from dotenv import load_dotenv

load_dotenv()

# 从环境变量中获取账号密码
host = os.getenv('DB_HOST')
port = os.getenv('DB_PORT')
database = os.getenv('DB_NAME')
user = os.getenv('DB_USER')
password = os.getenv('DB_PASSWORD')

config = {
    "host": host,
    "port": port,
    "database": database,
    "user": user,
    "password": password
}

config

{'host': 'localhost',
 'port': '5432',
 'database': 'ecommerce_orders',
 'user': 'admin',
 'password': 'admin-password'}

尝试连接数据库。

In [3]:
cursor = conn.cursor()
cursor.execute("SELECT version();")
record = cursor.fetchone()
record

('PostgreSQL 16.9 (Ubuntu 16.9-0ubuntu0.24.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0, 64-bit',)

In [4]:
with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM orders;")

    # 获取所有结果
    records = cursor.fetchall()

    # 输出查询结果
    for row in records:
        print(row)

(1001, 101, 1, 5001, 'ordered', datetime.datetime(2025, 5, 1, 10, 0))
(1002, 102, 2, 6002, 'ordered', datetime.datetime(2025, 5, 2, 14, 30))
(1003, 103, 1, 5003, 'ordered', datetime.datetime(2025, 5, 3, 9, 15))
(1004, 104, 3, 7004, 'cancelled', datetime.datetime(2025, 5, 4, 16, 45))
(1005, 105, 2, 6005, 'cancelled', datetime.datetime(2025, 5, 5, 11, 20))


In [5]:
with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM logistics;")

    # 获取所有结果
    records = cursor.fetchall()

    # 输出查询结果
    for row in records:
        print(row)

(1001, 'pending', datetime.datetime(2025, 5, 1, 10, 5))
(1002, 'in_transit', datetime.datetime(2025, 5, 2, 15, 0))
(1003, 'delivered', datetime.datetime(2025, 5, 3, 17, 30))
(1004, 'cancelled', datetime.datetime(2025, 5, 4, 16, 50))
(1005, 'in_transit', datetime.datetime(2025, 5, 5, 11, 30))


In [6]:
if conn:
    conn.close()
    print("数据库连接已关闭")

数据库连接已关闭


### 3. 使用 MCP 查询 Postgres

来到 `test_qwen3` 目录，启动 vLLM 服务：

```bash
cd test_qwen3
bash vllm_server.sh
```

In [13]:
import os

from qwen_agent.agents import Assistant
from qwen_agent.gui import WebUI

初始化配置了 Postgres MCP Server 的 Assistant Agent。

In [14]:
# llm 配置
llm_cfg = {
    'model': 'Qwen3-0.6B-FP8',
    'model_server': 'http://localhost:8000/v1',
    'api_key': 'token-kcgyrk',
    'generate_cfg': {
        'top_p': 0.95,
        'temperature': 0.6,
    }
}

# 配置 Postgres MCP Server
tools = [{
  "mcpServers": {
    "postgres": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-postgres",
        "postgresql://admin:admin-password@localhost:5432/ecommerce_orders",
        "--introspect"  # 自动读取数据库模式
      ]
    }
  }
}]

# 初始化 Agent
bot = Assistant(
    llm=llm_cfg,
    name='Postgres 数据库助手',
    description='查询 Postgres 数据库',
    system_message='',
    function_list=tools,
)

2025-06-15 17:09:51,747 - mcp_manager.py - 110 - INFO - Initializing MCP tools from mcp servers: ['postgres']
2025-06-15 17:09:51,757 - mcp_manager.py - 245 - INFO - Initializing a MCP stdio_client, if this takes forever, please check the config of this mcp server: postgres


我们查询订单表试试，看看这个 Agent 能否查询到 Postgres 中的数据。

In [15]:
query = '请在订单表中查询uid为102的用户的所有信息'
messages = [{'role': 'user', 'content': query}]

# 输出
response = bot.run_nonstream(messages)
# print('bot response:', response)

In [16]:
print('result:', response[-1]['content'].strip())

result: 查询结果如下：

- `order_id`：1002
- `uid`：102
- `mall_id`：2
- `goods_id`：6002
- `status`：ordered
- `timestamp`：2025-05-02T06:30:00.000Z

该订单由uid为102的用户在mall_id为2的店铺中购买goods_id为6002，状态为已下单。


### 4. 开发客诉核查 Workflow

现在我们来开发客诉核查 Agent。

**1）开发规划**

具体规划是这样的：

1. 先用一个 **护栏 Agent** 判断客诉是否在我们的处理范围内，如不在直接返回
2. 对于我们希望核实的每个客诉类型，开发对应的工具 `tools`（使用 Function Calling 或 MCP Server）
3. 开发一个 **核查 Agent**，接入上面开发的 tools，让它调用工具进行研判

**2）具体实现**

我开发了一个 Python 包实现这个 Workflow。它的代码太长，这里就不放了，大家去 GitHub 仓库 [luochang212/agent-project](https://github.com/luochang212/agent-project/tree/main/intelligent_routing) 看。

它的目录结构如下：

```
.
├── __init__.py  # 导入需要暴露的类和方法
├── agent.py  # 护栏 Agent + 研判 Agent（由它调用 tools）
├── check_fake_goods.py  # 用于核查假货问题的路由（tool）
└── check_overdue.py  # 用于核查物流逾期的路由（tool）
```

我开发了两个 Function Calling：

|类|工具函数|描述|
| -- | -- | -- |
|`FakeGoodsCheckerTool`|`check_fake_goods`|用于核查假货客诉的工具函数|
|`OverdueCheckerTool`|`check_overdue`|用于核查物流逾期的工具函数|

它们分别用于核查用户关于物流逾期和假货两种类型的投诉是否属实。

接下来展示 `intelligent_routing` 包的用法。

In [17]:
from intelligent_routing import workflow

下面是一条客诉示例，我们的客诉核查 Agent 将给出研判结论和对应的证据。

In [18]:
# 客诉内容
complaint = {
    "time": "2025-06-10",
    "uid": 103,
    "content": "等了很久，没收到货"
}

res = workflow(complaint, llm_cfg)

护栏 Agent：从客诉内容判断是物流逾期问题。


In [19]:
print(res)

客诉类型：物流逾期  
研判结论：订单未逾期  
支持证据：订单时间戳与物流时间戳均为下单当天17:30，说明物流时间在下单后7天内完成，未逾期。
