In [1]:
from agents import Agent
from agents.mcp.server import MCPServerStdio, MCPServerStdioParams

import os
from dotenv import load_dotenv
load_dotenv(override=True)

from openai import AsyncOpenAI
from agents import OpenAIChatCompletionsModel

from agents import Runner, set_tracing_disabled
set_tracing_disabled(True)

from agents import function_tool, RunContextWrapper
from dataclasses import dataclass

import pandas as pd
import numpy as np
from typing import Dict, Any, List


In [2]:
# 定义模型
BASE_URL = os.getenv("BASE_URL")
API_KEY = os.getenv("API_KEY")
MODEL = os.getenv("MODEL")

client = AsyncOpenAI(
    base_url=BASE_URL,
    api_key=API_KEY
)

model = OpenAIChatCompletionsModel(
    openai_client=client,
    model=MODEL,
)

In [3]:
params = MCPServerStdioParams(
    command="python",
    args=["./server/extract_data.py"]
)
server = MCPServerStdio(
    name="extract_data",
    params=params
)
await server.connect()

In [4]:
@dataclass
class Data:
    data: Any

data = Data(data=None)
data

Data(data=None)

In [5]:
@function_tool
async def extract_data(
    context: RunContextWrapper,    # 隐式参数： 不写进Json Schema
    file_path: str                 # 显式参数： 写进Json Schema
):
    """
    读取指定路径的数据
    :param file_path: 文件路径
    """
    df = pd.read_csv(file_path)
    context.context.data = df
    return f'已从{file_path}读取数据'

In [6]:
agent1 = Agent(
    name="Extractor",
    instructions="根据用户需求，从指定路径或数据库提取数据",
    tools=[extract_data],
    model=model
)

In [7]:
result = await Runner.run(
    starting_agent=agent1,
    input='读取test.csv',
    context=data
)

In [8]:
result.to_input_list()

[{'content': '读取test.csv', 'role': 'user'},
 {'arguments': '{"file_path":"test.csv"}',
  'call_id': 'call_yl4mefl7',
  'name': 'extract_data',
  'type': 'function_call',
  'id': '__fake_id__'},
 {'call_id': 'call_yl4mefl7',
  'output': '已从test.csv读取数据',
  'type': 'function_call_output'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': '<think>\n好的，用户让我读取test.csv文件，我需要先确认这个文件的位置。通常，如果用户没有指定具体路径，可能默认是在当前工作目录下。我需要调用extract_data函数，参数是file_path为"test.csv"。然后系统会处理读取操作，返回结果。用户可能接下来需要查看数据内容或者进行进一步的分析，我应该准备好后续步骤。不过现在只需要确认文件是否成功读取，如果有错误可能需要处理异常，但根据回复显示已成功读取，所以下一步可以询问用户是否需要查看数据或进行其他操作。\n</think>\n\n已成功从test.csv读取数据。您是否需要查看数据内容或进行进一步的操作？',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'}]

In [11]:
data.data

Unnamed: 0,col1,col2
0,1,4
1,2,5
2,3,6


In [26]:
Runner?

[0;31mInit signature:[0m [0mRunner[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m      <no docstring>
[0;31mFile:[0m           ~/anaconda3/envs/LLM/lib/python3.12/site-packages/agents/run.py
[0;31mType:[0m           type
[0;31mSubclasses:[0m     

In [40]:
result.to_input_list()

[{'content': '读取test.csv', 'role': 'user'},
 {'arguments': '{"file_path":"test.csv"}',
  'call_id': 'call_7qfroykb',
  'name': 'extract_data',
  'type': 'function_call',
  'id': '__fake_id__'},
 {'call_id': 'call_7qfroykb',
  'output': '已从test.csv读取数据',
  'type': 'function_call_output'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': '<think>\n好的，用户让我读取test.csv文件，我已经调用了extract_data函数，并且得到了响应，说已经成功读取数据。现在需要根据这个结果给出回答。首先，我应该确认数据读取是否成功，根据工具返回的信息，是成功的。接下来，用户可能想知道数据的内容或者接下来要做什么。但用户当前的指令只是读取文件，所以应该先反馈读取成功，并询问是否有进一步的操作需求。比如是否需要查看数据内容、分析数据或者处理数据等。这样既确认了当前操作的结果，又引导用户提出下一步的要求，确保对话的连贯性和效率。同时，要保持回答简洁明了，避免冗余信息。需要检查是否有其他可能的意图，但根据用户当前的提问，没有更多细节，所以保持基本的确认和询问即可。\n</think>\n\n已成功从 `test.csv` 文件中读取数据。您是否需要查看数据内容、进行分析或执行其他操作？',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'}]

In [4]:
agent1 = Agent(
    name="Extractor",
    instructions="根据用户需求，从指定路径或数据库提取数据",
    mcp_servers=[server],
    model=model
)
agent2 = Agent(
    name="Extractor",
    instructions="根据用户需求，从指定路径或数据库提取数据",
    mcp_servers=[server],
    model=model
)

In [14]:
@dataclass
class Data:
    data: Any
    "数据内容"

    washed: bool = False
    "是否清洗"

@dataclass
class AnalysisResult:
    id: str
    "数据分析id"

    text: str
    "数据分析结果文本"

@dataclass
class PictureResult:
    title: str
    "图片名称"

    path: str
    "图片路径"

    description: str = None
    "图片描述"


@dataclass
class AnalysisContext:
    data: Data
    result: Dict[str, AnalysisResult]
    pics: Dict[str, PictureResult]

In [25]:
from types.types import Data

ModuleNotFoundError: No module named 'types.types'; 'types' is not a package

In [12]:
from agent import extract_data_agent

ModuleNotFoundError: No module named 'types.types'; 'types' is not a package

In [15]:
# 创建一个随机的AnalysisContext对象
import pandas as pd
import numpy as np
from datetime import datetime

# 创建随机数据
np.random.seed(42)
# data = pd.DataFrame({
#     'A': np.random.normal(0, 1, 100),
#     'B': np.random.normal(5, 2, 100),
#     'C': np.random.choice(['类别1', '类别2', '类别3'], 100),
#     'D': pd.date_range(start='2023-01-01', periods=100)
# })
data = None
# 创建Data对象
data_obj = Data(data=data, washed=True)

# 创建一些分析结果
results = {
    'desc_analysis': AnalysisResult(
        id='desc_analysis',
        text='描述性统计分析显示变量A的均值为0.1，标准差为1.2'
    ),
    'corr_analysis': AnalysisResult(
        id='corr_analysis',
        text='相关性分析显示变量A和B的相关系数为0.3'
    )
}

# 创建一些图片结果
pictures = {
    'histogram': PictureResult(
        title='变量A的直方图',
        path='./output/histogram.png',
        description='展示了变量A的分布情况'
    ),
    'scatter': PictureResult(
        title='散点图',
        path='./output/scatter.png',
        description='展示了变量A和B的关系'
    )
}

# 创建完整的AnalysisContext
context = AnalysisContext(
    data=data_obj,
    result=results,
    pics=pictures
)


In [16]:
context

AnalysisContext(data=Data(data=None, washed=True), result={'desc_analysis': AnalysisResult(id='desc_analysis', text='描述性统计分析显示变量A的均值为0.1，标准差为1.2'), 'corr_analysis': AnalysisResult(id='corr_analysis', text='相关性分析显示变量A和B的相关系数为0.3')}, pics={'histogram': PictureResult(title='变量A的直方图', path='./output/histogram.png', description='展示了变量A的分布情况'), 'scatter': PictureResult(title='散点图', path='./output/scatter.png', description='展示了变量A和B的关系')})

In [22]:
@function_tool
async def extract_data(
    context: RunContextWrapper, 
    path: str
) -> str:
    try:
        # 判断文件类型
        file_extension = path.split('.')[-1].lower()
        
        if file_extension == 'csv':
            # 读取CSV文件
            # df = pd.read_csv(path)
            context.context.data.data = '已获取数据'
            return f"已从{path}提取数据"
            
        elif file_extension in ['xlsx', 'xls']:
            # 读取Excel文件
            df = pd.read_excel(path)
            return f"已从{path}提取数据"
            
        else:
            return f"不支持的文件类型: {file_extension}"
            
    except Exception as e:
        return f"读取文件时发生错误: {str(e)}"

In [None]:
agent = 

In [29]:
result = await Runner.run(starting_agent=agent,input="请从./data/data.csv文件中提取数据")

In [30]:
result.to_input_list()

[{'content': '请从./data/data.csv文件中提取数据', 'role': 'user'},
 {'id': '__fake_id__',
  'content': [{'annotations': [], 'text': '\n\n', 'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'},
 {'arguments': ' {"data_path": "./data/data.csv"}',
  'call_id': '0196c8cee693ac159785c7e1014e108c',
  'name': 'extract_data',
  'type': 'function_call',
  'id': '__fake_id__'},
 {'call_id': '0196c8cee693ac159785c7e1014e108c',
  'output': '{"type":"text","text":"已从./data/data.csv提取数据","annotations":null}',
  'type': 'function_call_output'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': '\n\n数据已成功从指定路径 ./data/data.csv 提取。如果需要查看数据内容或进行后续处理，请告诉我具体需求！',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'}]

# Output type

In [106]:
MODEL

'deepseek-chat'

In [72]:
from pydantic import BaseModel

class VariableAnalysis(BaseModel):
    """
    对变量的描述
    :param variable_name: 变量名
    :param variable_description: 变量描述
    """
    variable_name: str
    variable_description: str


class Report(BaseModel):
    """
    变量分析报告
    :param variable_analysis: 变量分析,属性为列表, 由若干个VariableAnalysis组成
    """
    variable_analysis: list[VariableAnalysis]

In [115]:
analysis_agent = Agent(
    name="Analysis",
    instructions="描述数据。Your response should be in the form of JSON with the following schema: {'response': 'your response here'}. Do not include any other text and do not wrap the response in a code block - just provide the raw json output.",
    model=model,
    output_type=Report
)

result = await Runner.run(starting_agent=analysis_agent,input='A=[1,2,3,4,5]和B=[6,7,8,9,10]')

In [60]:
result.final_output_as(VariableAnalysis).variable_analysis

[VariableAnalysis(variable_name='A', variable_description='提供了一组简单的数据来展示数组.'),
 VariableAnalysis(variable_name='B', variable_description='提供了一组简单的数据来展示数组.')]

In [33]:
from __future__ import annotations

import asyncio

from pydantic import BaseModel

from agents import (
    Agent,
    GuardrailFunctionOutput,
    InputGuardrailTripwireTriggered,
    RunContextWrapper,
    Runner,
    TResponseInputItem,
    input_guardrail,
)


In [34]:

### 1. An agent-based guardrail that is triggered if the user is asking to do math homework
class MathHomeworkOutput(BaseModel):
    reasoning: str
    is_math_homework: bool


guardrail_agent = Agent(
    name="Guardrail check",
    instructions="Check if the user is asking you to do their math homework.",
    output_type=MathHomeworkOutput,
    model=model
)


@input_guardrail
async def math_guardrail(
    context: RunContextWrapper[None], agent: Agent, input: str | list[TResponseInputItem]
) -> GuardrailFunctionOutput:
    """This is an input guardrail function, which happens to call an agent to check if the input
    is a math homework question.
    """
    result = await Runner.run(guardrail_agent, input, context=context.context)
    final_output = result.final_output_as(MathHomeworkOutput)

    return GuardrailFunctionOutput(
        output_info=final_output,
        tripwire_triggered=final_output.is_math_homework,
    )

In [35]:
agent = Agent(
    name="Customer support agent",
    instructions="You are a customer support agent. You help customers with their questions.",
    input_guardrails=[math_guardrail],
    model=model
)

In [None]:
input_data = []
user_input = '1+1=?'
input_data.append(
    {
        "role": "user",
        "content": user_input,
    }
)

try:
    result = await Runner.run(agent, input_data)
    print(result.final_output)
    # If the guardrail didn't trigger, we use the result as the input for the next run
    input_data = result.to_input_list()
except InputGuardrailTripwireTriggered:
    # If the guardrail triggered, we instead add a refusal message to the input
    message = "Sorry, I can't help you with your math homework."
    print(message)
    input_data.append(
        {
            "role": "assistant",
            "content": message,
        }
    )




**1 + 1 = 2**  

Is there anything else I can help you with? 😊


In [46]:
result.final_output_as()

'\n\n**1 + 1 = 2**  \n\nIs there anything else I can help you with? 😊'