## 初始化大模型，定义大模型操作本地路径

In [None]:
import os
from dotenv import load_dotenv

# langchain核心组件导入
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain.agents import create_agent


#加载环境
load_dotenv()

 # 初始化 LLM (确保使用较新的模型以支持良好的 Tool Calling)
llm = ChatOpenAI(
    model="deepseek-chat",
    temperature=0.5,
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL")
)

# 定义文件操作的安全根目录(防止AI随意修改系统文件)
WORK_DIR = "D:\\Agent资料"
if not os.path.exists(WORK_DIR):
    os.makedirs(WORK_DIR)



## 定义数据库操作工具

In [None]:
import pymysql
from pymysql.cursors import DictCursor
from typing import List, Optional, Dict

'''
employee_performance:
    - id:主键
    - employee_id:员工唯一标识
    - name:员工姓名
    - department:部门
    - position:岗位
    - hire_date:入职日期 YYYY-MM-DD
    - review_start:绩效开始日期
    - review_end:绩效结束日期
    - kpi_score:KPI得分
    - goals_met:目标是否达成(1/0)
    - rating:绩效评级
    - bonus_amount:奖金金额
    - manager_comments:主管评语
'''

# 数据库连接函数
def get_connection():
    return pymysql.connect(
        host="localhost",
        user="root",
        password="123456",
        database="emp",
        charset="utf8mb4",
        cursorclass=DictCursor
    )

# ----------------------------------------
# 1. 根据部门查询员工绩效
# ----------------------------------------
@tool
def query_by_department(
    department: str,
    min_score: Optional[int] = None,
    max_score: Optional[int] = None,
    limit: int = 50
) -> List[Dict]:
    """
    查询指定部门的员工绩效，可筛选 KPI 得分范围
    
    参数：
    - department: 部门名称
    - min_score: 最小 KPI 得分，可选
    - max_score: 最大 KPI 得分，可选
    - limit: 返回记录数量限制
    
    返回：
    - 员工绩效记录列表，每条记录为 dict
    """
    sql = "SELECT * FROM employee_performance WHERE department = %s"
    params = [department]
    
    if min_score is not None:
        sql += " AND kpi_score >= %s"
        params.append(min_score)
    if max_score is not None:
        sql += " AND kpi_score <= %s"
        params.append(max_score)
    
    sql += " ORDER BY kpi_score DESC LIMIT %s"
    params.append(limit)
    
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute(sql, params)
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

# ----------------------------------------
# 2. 根据绩效得分区间查询员工
# ----------------------------------------
@tool
def query_by_score(
    min_score: int,
    max_score: int,
    department: Optional[str] = None,
    limit: int = 50
) -> List[Dict]:
    """
    查询 KPI 得分在指定区间的员工，可指定部门
    
    参数：
    - min_score: 最小得分
    - max_score: 最大得分
    - department: 部门名称，可选
    - limit: 返回记录数量限制
    
    返回：
    - 员工绩效记录列表
    """
    sql = "SELECT * FROM employee_performance WHERE kpi_score BETWEEN %s AND %s"
    params = [min_score, max_score]
    
    if department:
        sql += " AND department = %s"
        params.append(department)
    
    sql += " ORDER BY kpi_score DESC LIMIT %s"
    params.append(limit)
    
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute(sql, params)
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

# ----------------------------------------
# 3. 查询指定绩效评级员工
# ----------------------------------------
@tool
def query_by_rating(
    rating: str,
    department: Optional[str] = None,
    limit: int = 50
) -> List[Dict]:
    """
    查询指定绩效评级(rating)的员工,可指定部门
    
    参数：
    - rating: 绩效评级，例如 "优秀", "良好", "合格"
    - department: 部门名称，可选
    - limit: 返回记录数
    
    返回：
    - 员工绩效记录列表
    """
    sql = "SELECT * FROM employee_performance WHERE rating = %s"
    params = [rating]
    
    if department:
        sql += " AND department = %s"
        params.append(department)
    
    sql += " ORDER BY kpi_score DESC LIMIT %s"
    params.append(limit)
    
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute(sql, params)
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

# ----------------------------------------
# 4. 综合查询函数（部门 + 分数区间 + 绩效评级）
# ----------------------------------------
@tool
def query_employee_performance(
    department: Optional[str] = None,
    min_score: Optional[int] = None,
    max_score: Optional[int] = None,
    rating: Optional[str] = None,
    limit: int = 50
) -> List[Dict]:
    """
    综合查询员工绩效,可根据部门、得分区间和绩效评级筛选
    
    参数：
    - department: 部门名称,可选
    - min_score: 最小 KPI 得分,可选
    - max_score: 最大 KPI 得分,可选
    - rating: 绩效评级,可选
    - limit: 返回记录数量限制
    """
    sql = "SELECT * FROM employee_performance WHERE 1=1"
    params = []
    
    if department:
        sql += " AND department = %s"
        params.append(department)
    if min_score is not None:
        sql += " AND kpi_score >= %s"
        params.append(min_score)
    if max_score is not None:
        sql += " AND kpi_score <= %s"
        params.append(max_score)
    if rating:
        sql += " AND rating = %s"
        params.append(rating)
    
    sql += " ORDER BY kpi_score DESC LIMIT %s"
    params.append(limit)
    
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute(sql, params)
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows


@tool
def file_manager(action: str, filename: str, content: Optional[str] = None) -> str:
    """
    对指定路径的文件进行读写操作。
    
    Args:
        action (str): 操作类型，必须是 'read' (读取) 或 'write' (写入)。
        filename (str): 文件名（例如 'notes.txt'）。
        content (str, optional): 当 action 为 'write' 时，需要写入的内容。
    """
    filepath = os.path.join(WORK_DIR, filename)
    
    try:
        if action == "write":
            if content is None:
                return "错误: 写入模式需要提供 content 内容。"
            with open(filepath, "w", encoding="utf-8") as f:
                f.write(content)
            return f"成功: 已将内容写入 {filename}。"
            
        elif action == "read":
            if not os.path.exists(filepath):
                return f"错误: 文件 {filename} 不存在。"
            with open(filepath, "r", encoding="utf-8") as f:
                return f.read()
                
        else:
            return "错误: 不支持的操作。请使用 'read' 或 'write'。"
            
    except Exception as e:
        return f"发生异常: {str(e)}"

## 大模型调用数据库工具，文件读写工具工作

In [None]:
from threading import Thread
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import json
import asyncio
import nest_asyncio
import uvicorn


# 工具列表
tools = [file_manager,query_by_department,query_by_score,query_by_rating,query_employee_performance]

# 创建 Agent
agent_executor = create_agent(llm, tools)

nest_asyncio.apply()
app = FastAPI(title="Agent Backend Service")

# 允许跨域请求,生产环境建议将 "*" 换成具体的前端域名
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/")
def root():
    return {"message": "Agent 服务已启动"}

@app.post("/query")
async def query_agent(request: Request):
    """
    接收 JSON:
    {
        "query": "用户的输入内容"
    }
    """
    data = await request.json()
    user_query = data.get("query", "")

    async def event_generator():
        # 使用 stream 模式执行
        events = agent_executor.stream(
            {"messages": [HumanMessage(content=user_query)]},
            stream_mode="values"
        )
         # event["messages"] 包含了当前的对话历史
        for event in events:
            last_message = event["messages"][-1]

            output_data = {}
            if last_message.type == "ai":
                # 如果有 tool_calls，说明正在调用工具
                if hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0:
                    output_data["tool_call"] = last_message.tool_calls[0]["name"]
                else:
                    output_data["answer"] = last_message.content

            elif last_message.type == "tool":
                output_data["tool_output"] = last_message.content

            # 每次产生一个事件就 yield JSON 字符串
            yield f"data: {json.dumps(output_data,ensure_ascii=False)}"
            await asyncio.sleep(0.01)  # 给 async 流式一个小等待

    return StreamingResponse(event_generator(), media_type="text/event-stream")

# 在 Jupyter 中启动服务的函数
def run_app():
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")

# 使用线程启动
thread = Thread(target=run_app, daemon=True)
thread.start()

print("FastAPI 服务已在 http://127.0.0.1:8000 启动")

In [None]:

# 工具列表
tools = [file_manager,query_by_department,query_by_score,query_by_rating,query_employee_performance]

    
# 创建Agent
agent_executor = create_agent(llm, tools)

print("------ Agent 已启动 ------")

# 用户输入
query = "查询销售部门绩效得分在85分以上的员工"
    
print(f"\n用户: {query}")
print("Agent 正在思考...\n")

# stream 流式输出
events = agent_executor.stream(
    {"messages": [HumanMessage(content=query)]},
    stream_mode="values"
)

for event in events:
    # event["messages"] 包含了当前的对话历史
    last_message = event["messages"][-1]
        
    # 打印 AI 的回复
    if last_message.type == "ai":
        # 如果有 tool_calls，说明正在调用工具
        if hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0:
            print(f"Executing Tool: {last_message.tool_calls[0]['name']}")
        else:
            print(f"Final Answer: {last_message.content}")
        
    # 打印 Tool 的执行结果
    elif last_message.type == "tool":
        print(f"Tool Output: {last_message.content}")
