## 启动服务

In [1]:
import mag
import time
import json
from IPython.display import display, Markdown
import os
import webbrowser
import subprocess
from IPython.display import Markdown, HTML, display

In [2]:
# 启动服务
mag.start()

MAG服务器已启动 (PID: 10484), 日志: C:\Users\keta\AppData\Local\Temp\mag_sdk_logs\mag_server.log


True

In [None]:
# 关闭服务
mag.shutdown()

In [None]:
is_running = mag.is_running()
print(f"服务运行状态: {is_running}")

## 模型管理

#### （1）新增模型

In [None]:
new_model = {
    "name": "deepseek-chat",
    "base_url": "https://api.deepseek.com",
    "api_key": "",
    "model": "deepseek-chat"
}
mag.add_model(new_model)

#### （2）列出模型

In [None]:
models = mag.list_models()
print(f"找到 {len(models)} 个模型")
for model in models:
    print(f"\n模型名称：{model['name']} \nBase_url：{model['base_url']}\n")

#### （3）更新模型

In [None]:
# 更新模型
updated_model = {
    "name": "deepseek-chat",
    "base_url": "https://api.deepseek.com",
    "api_key": "",
    "model": "deepseek-chat"
}
result = mag.update_model("deepseek-chat", updated_model)
print(result)

#### （4）删除模型

In [None]:
mag.delete_model("deepseek-chat")

## MCP管理

#### （1）增加/删除server

In [None]:
# 增加server
servers = {"mcpServers": {
    "fetch": {
      "autoApprove": [
        "fetch"
      ],
      "timeout": 60,
      "command": "uvx",
      "args": [
        "mcp-server-fetch"
      ],
      "transportType": "stdio"
    }
}}
result = mag.add_server(servers=servers)
print(json.dumps(result, indent=4, ensure_ascii=False))

In [None]:
# 删除server
result = mag.remove_server(["fetch","memory"])
print(json.dumps(result, indent=4, ensure_ascii=False))

#### （2）更新server配置文件（适合全面更新）

In [None]:
# 更新mcp server
# 此举将会替代之前的配置文件
config={
  "mcpServers": {
    "memory": {
      "autoApprove": [],
      "timeout": 60,
      "command": "docker",
      "args": [
        "run",
        "-i",
        "-v",
        "claude-memory:/app/dist",
        "--rm",
        "mcp/memory"
      ],
      "transportType": "stdio",
    },
    "fetch": {
      "autoApprove": [
        "fetch"
      ],
      "timeout": 60,
      "command": "uvx",
      "args": [
        "mcp-server-fetch"
      ],
      "transportType": "stdio",
    }
  }
}
result = mag.update_mcp_config(config=config)
print(json.dumps(result, indent=4, ensure_ascii=False))

#### （3）连接mcp server

In [None]:
# 连接单个
mag.connect_mcp("fetch")

In [None]:
# 连接所有
result = mag.connect_mcp("all")
print(json.dumps(result, indent=2, ensure_ascii=False))

#### （4）获取MCP配置

In [None]:
mcp_config = mag.get_mcp_config()
print(json.dumps(mcp_config, indent=8, ensure_ascii=False))

#### （6）获取MCP状态/查看工具

In [None]:
mcp_status = mag.get_mcp_status()
for server, status in mcp_status.items():
    print(f"\n{server}: {'已连接' if status.get('connected') else '未连接'}")
    print(f"{server}工具：{status.get('tools')}")
print(json.dumps(mcp_status, indent=4, ensure_ascii=False))

#### （7）获取mcp工具参数

In [None]:
mcp_tools = mag.get_tools()
print(json.dumps(mcp_tools, indent=2, ensure_ascii=False))

## 图管理

#### （1）导入一个图

In [None]:
# 从包导入
graph = mag.import_graph("easy_search.zip")
print(json.dumps(graph, indent=2, ensure_ascii=False))

# 从config导入
graph = mag.import_graph(r"news_processing_workflow.json")
print(json.dumps(graph, indent=2, ensure_ascii=False))

#### （2）导出一个图

In [None]:
graph = mag.export_graph("news_processing_workflow")
print(json.dumps(graph, indent=2, ensure_ascii=False))

{
  "status": "success",
  "message": "图 'easy_search' 导出成功",
  "file_path": "C:\\Users\\keta\\.mag\\exports\\easy_search.zip"
}


#### （3）列出所有图

In [None]:
print("\n列出所有图:")
graphs = mag.list_graphs()
print(f"找到 {len(graphs)} 个图: {graphs}")

#### （4）查看一个图配置文件

In [None]:
result = mag.get_graph('news_processing_workflow')
print(json.dumps(result, indent=2, ensure_ascii=False))

#### （5）绘图

In [None]:
graph_detail = mag.get_graph_detail("news_processing_workflow")

# 渲染README
if graph_detail['readme']:
    display(Markdown(graph_detail['readme']))
else:
    print("未找到README文件")

#### （6）将图导出为mcp server 脚本

In [None]:
mcp = mag.generate_mcp_script("news_processing_workflow")
# 保存并行执行脚本
with open("news_workflow_parallel.py", "w", encoding="utf-8") as f:
    f.write(mcp["parallel_script"])

# 保存顺序执行脚本
with open("news_workflow_sequential.py", "w", encoding="utf-8") as f:
    f.write(mcp["sequential_script"])

#### （7） 运行图

In [None]:
result = mag.run("news_processing_workflow",input_text=".",parallel=False)
print(result)

## 对话管理

#### (1)列出所有对话

In [None]:
result = mag.list_conversations()
result

#### (2)获取对话结果

In [None]:
result = mag.get_conversation('')
display(Markdown(result.get("output")))

In [None]:
attactments = result.get("attachments")
print(json.dumps(attactments, indent=2, ensure_ascii=False))

In [None]:
def display_attachments(attachments):
    """
    显示附件内容，支持md、html文件,这里只提供demo，运行时产生其他扩展文件请自行设计。或直接前往目录查看。
    使用notebook的markdown进行展示
    
    参数:
    attachments - 附件列表
    """
    if not attachments:
        print("没有可显示的附件")
        return
    
    for attachment in attachments:
        file_type = attachment["type"].lower()
        file_path = attachment["path"]
        file_name = attachment["name"]
        
        # 跳过不支持的文件类型
        if file_type not in ["md", "html", "docx", "pdf"]:
            continue
            
        print(f"\n📄 {file_name} ({file_type.upper()}):")
        
        try:
            # Markdown文件 - 使用IPython的Markdown显示
            if file_type == "md":
                with open(file_path, 'r', encoding='utf-8') as f:
                    md_content = f.read()
                display(Markdown(md_content))
                
            # HTML文件 - 使用IPython的HTML显示，或在浏览器中打开
            elif file_type == "html":
                try:
                    webbrowser.open(file_path)
                except:
                    print(f"已在浏览器中打开 HTML 文件")
                
        except Exception as e:
            print(f"无法显示文件 {file_name}: {str(e)}")

display_attachments(attactments)