# DataAnalysisAgent - 智能数据分析助手

## 第1部分：环境配置

In [1]:
# 导入库和配置
from hello_agents import SimpleAgent, HelloAgentsLLM
from hello_agents.tools import Tool, ToolParameter
from typing import Dict, Any, List
import ast
import os

# 配置LLM参数
os.environ["LLM_MODEL_ID"] = "Qwen/Qwen3-8B"
os.environ["LLM_API_KEY"] = "ms-9382e20f-96c2-456a-b609-af5c81201066"
os.environ["LLM_BASE_URL"] = "https://api-inference.modelscope.cn/v1/"
os.environ["LLM_TIMEOUT"] = "60"

print("✅ 库导入和配置完成")

✅ 库导入和配置完成


## 第2部分：定义数据分析工具

In [2]:
import json
import pandas as pd
from typing import Dict, Any, List

class DataCleaningTool(Tool):
    """数据清洗工具 - 基于用户指定规则清洗表格数据"""

    def __init__(self):
        super().__init__(
            name="data_cleaner",
            description="对传入的表格数据执行清洗操作，包括去空值、列筛选等"
        )

    def run(self, parameters: Dict[str, Any]) -> str:
        """
        执行数据清洗
        parameters 应包含：
          - data_json: str，来自 excel_reader 的 JSON 字符串（必须）
          - drop_na: bool，是否删除含空值的行（默认 False）
          - columns_to_keep: List[str]，保留的列名列表（可选）
        """
        data_json = parameters.get("data_json")
        if not data_json:
            return "错误：缺少原始数据（data_json 不能为空）"

        try:
            # 解析原始数据
            raw_data = json.loads(data_json)
            records = raw_data.get("完整数据", [])
            if not records:
                return "警告：原始数据为空，无法清洗"

            df = pd.DataFrame(records)

            # 1. 列筛选
            columns_to_keep = parameters.get("columns_to_keep")
            if columns_to_keep:
                missing_cols = [col for col in columns_to_keep if col not in df.columns]
                if missing_cols:
                    return f"错误：指定保留的列不存在：{missing_cols}"
                df = df[columns_to_keep]


            # 2. 删除空值行
            if parameters.get("drop_na", False):
                original_len = len(df)
                df = df.dropna()
                dropped = original_len - len(df)
                if dropped > 0:
                    pass
            
            df = df.fillna(0)
            # 构建清洗后结果
            cleaned_records = df.where(pd.notnull(df), None).to_dict(orient='records')
            result = {
                "clean_data": cleaned_records
            }

            return json.dumps(result, ensure_ascii=False, indent=2)

        except json.JSONDecodeError:
            return "错误：data_json 不是有效的 JSON 格式"
        except Exception as e:
            return f"清洗过程中出错：{str(e)}"

    def get_parameters(self) -> List[ToolParameter]:
        return [
            ToolParameter(
                name="data_json",
                type="string",
                description="原始数据的 JSON 字符串",
                required=True
            ),
            ToolParameter(
                name="drop_na",
                type="boolean",
                description="是否删除包含空值的行",
                required=False
            ),
            ToolParameter(
                name="columns_to_keep",
                type="array",
                description="要保留的列名列表",
                required=False
            ),
        ]

print("✅ DataCleaningTool 定义完成")

✅ DataCleaningTool 定义完成


In [3]:
class DataStatisticsTool(Tool):
    """数据统计工具 - 提供描述性统计分析"""

    def __init__(self):
        super().__init__(
            name="data_statistics",
            description="对数据进行描述性统计分析，包括均值、中位数、标准差等"
        )

    def run(self, parameters: Dict[str, Any]) -> str:
        data_json = parameters.get("data_json")
        if not data_json:
            return "错误：缺少数据"

        try:
            raw_data = json.loads(data_json)
            records = raw_data.get("clean_data", [])
            df = pd.DataFrame(records)
            
            # 数值型列的统计
            numeric_stats = {}
            for col in df.select_dtypes(include=[np.number]).columns:
                numeric_stats[col] = {
                    "count": int(df[col].count()),
                    "mean": float(df[col].mean()),
                    "median": float(df[col].median()),
                    "std": float(df[col].std()),
                    "min": float(df[col].min()),
                    "max": float(df[col].max()),
                    "q25": float(df[col].quantile(0.25)),
                    "q75": float(df[col].quantile(0.75))
                }
            
            # 分类型列的统计
            categorical_stats = {}
            for col in df.select_dtypes(include=['object']).columns:
                value_counts = df[col].value_counts().head(10).to_dict()
                categorical_stats[col] = {
                    "unique_count": int(df[col].nunique()),
                    "top_values": value_counts
                }
            
            result = {
                "shape": f"{len(df)} 行, {len(df.columns)} 列",
                "numeric_stats": numeric_stats,
                "categorical_stats": categorical_stats,
            }
            
            return json.dumps(result, ensure_ascii=False, indent=2)
            
        except Exception as e:
            return f"统计分析出错：{str(e)}"

    def get_parameters(self) -> List[ToolParameter]:
        return [
            ToolParameter(
                name="data_json",
                type="string",
                description="数据的 JSON 字符串",
                required=True
            )
        ]

print("✅ DataStatisticsTool 定义完成")


✅ DataStatisticsTool 定义完成


## 第3部分：创建智能体

In [4]:
# 创建工具注册表和智能体
from hello_agents import ToolRegistry

# 创建工具注册表
tool_registry = ToolRegistry()
tool_registry.register_tool(DataCleaningTool())

system_prompt = """你是一名数据分析师,你的任务是:
    1. 使用data_cleaner工具清洗数据
    2. 使用data_statistics工具统计数据
    3. 选择合适的图表，用echarts代码绘制图表，例如：
    option = {
        xAxis: {
            type: 'category',
            data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        },
        yAxis: {
            type: 'value'
        },
        series: [
            {
            data: [120, 200, 150, 80, 70, 110, 130],
            type: 'bar'
            }
        ]
    };
    3、不要对代码分析，不要输出html，只输出echarts代码
    4、最后基于数据，提供详细的数据分析报告
    
    数据分析报告应包括：
    - 分析背景与目标
    - 关键的发现
    - 进行统计计算、趋势识别、异常检测或对比分析
    避免主观臆断，结论需基于数据，
    请以Markdown格式输出报告。
    """
# 创建智能体
agent = SimpleAgent(
    name="数据分析助手",
    llm=HelloAgentsLLM(),
    system_prompt=system_prompt,
    tool_registry=tool_registry
)

print("✅ 智能体创建完成")
print(f"✅ 可用工具: {list(tool_registry._tools.keys())}")

✅ 工具 'data_cleaner' 已注册。
✅ 智能体创建完成
✅ 可用工具: ['data_cleaner']


## 第4部分：读取示例数据表格

In [None]:
file_path = "./data/simple_data.xls"

try:
    df = pd.read_excel(file_path)
    # ⚠️ 不做清洗！保留原始 NaN（pandas 会自动将 Excel 空单元格转为 NaN）
    data_records = df.to_dict(orient='records') 

    # 构造符合 DataCleaningTool 要求的输入格式
    clean_input = {
        "完整数据": data_records
    }
    sample_data = json.dumps(clean_input, ensure_ascii=False, indent=2)

except FileNotFoundError:
    sample_data = json.dumps({"error": f"Excel 文件不存在: {file_path}"}, ensure_ascii=False)
except Exception as e:
    sample_data = json.dumps({"error": f"读取 Excel 文件失败: {str(e)}"}, ensure_ascii=False)

print(sample_data)

{
  "完整数据": [
    {
      "指标": "居民消费价格指数(上年同月=100)",
      "2025年10月": 100.2,
      "2025年9月": 99.7,
      "2025年8月": 99.6,
      "2025年7月": 100.0,
      "2025年6月": 100.1
    },
    {
      "指标": "食品烟酒类居民消费价格指数(上年同月=100)",
      "2025年10月": 98.4,
      "2025年9月": 97.4,
      "2025年8月": 97.5,
      "2025年7月": 99.2,
      "2025年6月": 100.1
    },
    {
      "指标": "衣着类居民消费价格指数(上年同月=100)",
      "2025年10月": 101.7,
      "2025年9月": 101.7,
      "2025年8月": 101.8,
      "2025年7月": 101.7,
      "2025年6月": 101.6
    },
    {
      "指标": "居住类居民消费价格指数(上年同月=100)",
      "2025年10月": 100.1,
      "2025年9月": 100.1,
      "2025年8月": 100.1,
      "2025年7月": 100.1,
      "2025年6月": 100.1
    },
    {
      "指标": "生活用品及服务类居民消费价格指数(上年同月=100)",
      "2025年10月": 101.9,
      "2025年9月": 102.2,
      "2025年8月": 101.8,
      "2025年7月": 101.2,
      "2025年6月": 100.7
    },
    {
      "指标": "交通通信类居民消费价格指数(上年同月=100)",
      "2025年10月": 98.5,
      "2025年9月": 98.0,
      "2025年8月": 97.6,
      "2025年7月": 96.9,


## 第5部分：执行数据分析

In [6]:
# 执行数据分析
print("=== 开始数据分析 ===")
result = agent.run(f"对以下数据绘制图表和数据分析\n\n{sample_data}\n")

print(result)

=== 开始数据分析 ===
option = {
    xAxis: {
        type: 'category',
        data: ['2025年6月', '2025年7月', '2025年8月', '2025年9月', '2025年10月']
    },
    yAxis: {
        type: 'value'
    },
    series: [
        {
            name: '居民消费价格指数',
            data: [100.1, 100.0, 99.6, 99.7, 100.2],
            type: 'line'
        },
        {
            name: '食品烟酒类居民消费价格指数',
            data: [100.1, 99.2, 97.5, 97.4, 98.4],
            type: 'line'
        },
        {
            name: '衣着类居民消费价格指数',
            data: [101.6, 101.7, 101.8, 101.7, 101.7],
            type: 'line'
        },
        {
            name: '居住类居民消费价格指数',
            data: [100.1, 100.1, 100.1, 100.1, 100.1],
            type: 'line'
        },
        {
            name: '生活用品及服务类居民消费价格指数',
            data: [100.7, 101.2, 101.8, 102.2, 101.9],
            type: 'line'
        },
        {
            name: '交通通信类居民消费价格指数',
            data: [96.3, 96.9, 97.6, 98.0, 98.5],
            type: 'line'
        },
  

## 第6部分：保存分析报告和图表

In [14]:
import re
import os

echarts_match = re.search(r"option\s*=\s*(\{[\s\S]*?\});", result)
if echarts_match:
    echarts_code = echarts_match.group(1)
    print(echarts_code)
else:
    print("未找到 ECharts 代码")

{
    xAxis: {
        type: 'category',
        data: ['2025年6月', '2025年7月', '2025年8月', '2025年9月', '2025年10月']
    },
    yAxis: {
        type: 'value'
    },
    series: [
        {
            name: '居民消费价格指数',
            data: [100.1, 100.0, 99.6, 99.7, 100.2],
            type: 'line'
        },
        {
            name: '食品烟酒类居民消费价格指数',
            data: [100.1, 99.2, 97.5, 97.4, 98.4],
            type: 'line'
        },
        {
            name: '衣着类居民消费价格指数',
            data: [101.6, 101.7, 101.8, 101.7, 101.7],
            type: 'line'
        },
        {
            name: '居住类居民消费价格指数',
            data: [100.1, 100.1, 100.1, 100.1, 100.1],
            type: 'line'
        },
        {
            name: '生活用品及服务类居民消费价格指数',
            data: [100.7, 101.2, 101.8, 102.2, 101.9],
            type: 'line'
        },
        {
            name: '交通通信类居民消费价格指数',
            data: [96.3, 96.9, 97.6, 98.0, 98.5],
            type: 'line'
        },
        {
            name

In [8]:
report_match = re.search(r"(# 数据分析报告[\s\S]*)", result)

markdown_report = report_match.group(1).strip()
print("提取到 Markdown 报告")


# ==============================
# 3. 保存 Markdown 报告到文件
# ==============================
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
md_path = os.path.join(output_dir, "report.md")

with open(md_path, "w", encoding="utf-8") as f:
    f.write(markdown_report)

print(f"\nMarkdown 报告已保存至: {md_path}")

提取到 Markdown 报告

Markdown 报告已保存至: ./output\report.md


In [13]:
from IPython.display import HTML

html_code = f'''
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>第一个 ECharts 实例</title>
    <!-- 引入 echarts.js -->
    <script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script>
</head>
<body>
    <!-- 为ECharts准备一个具备大小（宽高）的Dom -->
    <div id="main" style="width: 600px;height:400px;"></div>
    <script type="text/javascript">
        // 基于准备好的dom，初始化echarts实例
        var myChart = echarts.init(document.getElementById('main'));
 
        // 指定图表的配置项和数据
        var option = {echarts_code}
        
        // 使用刚指定的配置项和数据显示图表。
        myChart.setOption(option);
    </script>
</body>
</html>
'''

In [15]:
from IPython.display import IFrame

# 将 HTML 代码保存为文件
with open('./output/echarts.html', 'w', encoding='utf-8') as f:
    f.write(html_code)
