# Agent沙箱 文档

## 描述

Agent沙箱是一个安全的代码执行环境，为AI Agent提供安全、隔离的代码运行空间。沙箱支持多种编程语言，包括Python、JavaScript、TypeScript、Java、R语言，并提供了完整的文件系统操作、终端命令执行等功能。

### 主要特性

- **生命周期管理**: 支持沙箱的创建、列表、关闭等完整生命周期管理
- **多语言支持**: 支持Python、JavaScript、TypeScript、Java、R语言、Bash等多种编程语言
- **代码执行隔离**: 通过上下文机制实现不同代码执行环境的隔离
- **文件系统操作**: 支持文件上传、下载、创建、删除、重命名等完整文件操作
- **终端命令执行**: 支持前台和后台命令执行
- **流式输出**: 支持代码执行结果的流式输出，实时获取执行状态
- **文件监控**: 支持文件系统变化监控，获取文件操作事件


### 使用场景

- AI Agent代码执行环境
- 代码测试和调试
- 数据处理和分析
- 自动化脚本执行
- 交互式编程环境


## 安装依赖

In [None]:
%pip install e2b_code_interpreter

## 沙箱生命周期操作
### 创建沙箱

In [None]:
import os
from e2b_code_interpreter import Sandbox
# 在Agent Sandbox控制台中获取API Key以及可用域名
# 可通过环境变量预先设置，或在此处直接修改
if not os.getenv("E2B_DOMAIN"):
    os.environ["E2B_DOMAIN"] = "tencentags.com"
if not os.getenv("E2B_API_KEY"):
    os.environ["E2B_API_KEY"] = "your_api_key"

# 使用沙箱服务前，需先在控制台中创建沙箱工具，如"code-interpreter-v1"
# 创建代码沙箱，默认运行5分钟，使用timeout参数指定运行时间，此处保持运行3600s(1h)
sandbox = Sandbox.create(template="code-interpreter-v1",timeout=3600)

### 列出所有沙箱

In [None]:
# 获取沙箱列表分页器,每页5个沙箱
paginator = Sandbox.list(limit=(5))
while paginator.has_next:
    # 打印当前沙箱列表页中的沙箱
    for sandbox_info in paginator.next_items():
        print(sandbox_info)
    print("next 5 sandbox")

### 重新设置过期时间

In [None]:
print(sandbox.get_info().started_at, sandbox.get_info().end_at)
sandbox.set_timeout(600)
print(sandbox.get_info().started_at, sandbox.get_info().end_at)

### 连接现有沙箱

In [None]:
sandbox_2 = Sandbox.connect(sandbox.sandbox_id)
print(sandbox.get_host(8080), sandbox_2.get_host(8080))

### 关闭沙箱

In [None]:
# 立即关闭沙箱
sandbox.kill()

## 沙箱代码执行操作

In [None]:
import os
import asyncio
from e2b_code_interpreter import Sandbox

# 可通过环境变量预先设置，或在此处直接修改
if not os.getenv("E2B_DOMAIN"):
    os.environ["E2B_DOMAIN"] = "tencentags.com"
if not os.getenv("E2B_API_KEY"):
    os.environ["E2B_API_KEY"] = "your_api_key"

sandbox = Sandbox.create(template="code-interpreter-v1",timeout=3600)

### 执行python代码
代码执行支持流式输出结果

In [None]:
python_code = """
import time
print("hello python")
time.sleep(2)
print("hello python after 2 sec")
"""
# 执行上述python代码，流式获取代码输出并打印，代码执行超时时间为600秒
print(sandbox.run_code(python_code,on_stdout=lambda data:print(data),timeout=600))

### 执行JavaScript & TypeScript代码

In [None]:
js_code = """
console.log("hello javascript")
"""
print(sandbox.run_code(js_code,"js"))
ts_code = """
var text:string = "hello typescript"
console.log(text)
"""
print(sandbox.run_code(ts_code,"ts"))

### 执行Java代码

In [None]:
java_code = """
System.out.println("hello java");
"""
print(sandbox.run_code(java_code,"java"))

### 执行R语言代码

In [None]:
r_code = """
print("hello r")
"""
print(sandbox.run_code(r_code,"r"))

### 执行bash代码

In [None]:
bash_code = """
echo hello bash
"""
print(sandbox.run_code(bash_code,"bash"))

### 新建context上下文
上下文用于代码执行隔离，若不指定上下文，则同一语言的代码在同一上下文执行，共享变量等内容）

In [None]:
ctx = sandbox.create_code_context(language="python")
# 在默认context下执行代码，将变量x赋值为1
sandbox.run_code("x=1")
# 在ctx context下执行代码，将变量x赋值为2
sandbox.run_code("x=2",context=ctx)
# 打印默认context下变量x的值
print(sandbox.run_code("print(x)").logs.stdout[0],end="")
# 打印ctx context下变量的值
print(sandbox.run_code("print(x)",context=ctx).logs.stdout[0],end="")


## 沙箱文件系统操作

### 上传文件

In [None]:
import tempfile
# 创建临时文件，内容为"test file"
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
    temp_file.write("test file")
    temp_file_path = temp_file.name

# 读取临时文件并上传到sandbox
with open(temp_file_path, "r") as f:
    sandbox.files.write("temp.txt", f)
print("文件已上传到sandbox: temp.txt")
# 打印当前沙箱内temp.txt文件的值
print(sandbox.files.read("temp.txt"))
sandbox.files.remove("temp.txt")

### 下载文件

In [None]:
# 写入temp file到临时文件temp.txt中
sandbox.files.write("temp.txt","temp file")
# 将沙箱内temp.txt的内容写入到本地temp.txt中
# 写入后可以在本地当前文件夹看到temp.txt
with open("temp.txt", "wb") as local_file:
    local_file.write(sandbox.files.read("temp.txt",format="bytes"))
print("文件已下载到本地: temp.txt")
sandbox.files.remove("temp.txt")

### 检查文件是否存在

In [None]:
# 当前temp.txt文件不存在
print(sandbox.files.exists("temp.txt"))
# 创建并写入temp.txt文件
sandbox.files.write("temp.txt","temp file")
# 当前temp.txt文件存在
print(sandbox.files.exists("temp.txt"))
sandbox.files.remove("temp.txt")


### 重命名文件

In [None]:
# 在test目录下创建一个名为temp.txt的文件
sandbox.files.write("./test/temp.txt","temp file")
# 打印test目录下的所有文件
for item in sandbox.files.list("./test"):
    print(item.path)
# 将temp.txt文件改名为tempfile.txt
response = sandbox.files.rename("./test/temp.txt","./test/tempfile.txt")
# 打印test目录下的所有文件
for item in sandbox.files.list("./test"):
    print(item.path)
sandbox.files.remove("./test")

### 创建文件夹

In [None]:
# 打印家目录下所有文件
for item in sandbox.files.list("."):
    print(item.name,end=" ")
# 创建一个名为test的文件夹
response = sandbox.files.make_dir("./test")
print()
# 打印家目录下所有文件
for item in sandbox.files.list("."):
    print(item.name,end=" ")
sandbox.files.remove("./test")

### 监控文件变化

In [None]:
# 创建一个文件监控器
watch_handle = sandbox.files.watch_dir(".")
# 创建一个tempfile.txt并写入内容
sandbox.files.write("tempfile.txt","temp file")
# 打印这段时间的所有事件
for event in watch_handle.get_new_events():
    print(event)
# 停止文件监控器
watch_handle.stop()
sandbox.files.remove("tempfile.txt")
    

## 沙箱PTY终端命令执行

### 执行终端命令

In [None]:
# 使用终端执行echo hello
hello_result = sandbox.commands.run("echo hello")
print(hello_result.stdout)

### 后台执行命令

In [None]:
# 使用终端打印1 2 3，每次打印间隔1秒
echo_code = """
for i in {1..3}; do
    echo -n $i
    sleep 1
done 
echo done
"""
# 后台执行指令
echo_handler = sandbox.commands.run(echo_code,background=True)

### 列出后台正在执行的命令

In [None]:
command_list = sandbox.commands.list()
print(command_list)

### 等待后台执行的命令完成并流式输出结果

In [None]:
response = echo_handler.wait(on_stdout=lambda data:print(data,end=" "),on_stderr=lambda data:print(data,end=" "))

### 向后台运行的命令发送输入

In [None]:
# 使用终端运行代码，等待用户输入
read_code = """
echo wait for user input
read test
echo $test
"""

read_handler = sandbox.commands.run(read_code,background=True,timeout=5)


# 创建一个任务来等待命令完成并显示输出
wait_task = asyncio.create_task(
    asyncio.to_thread(
        read_handler.wait,
        on_stdout=lambda data: print(data, end=""),
        on_stderr=lambda data: print(data, end="")
    )
)

# 延迟1秒后发送stdin
await asyncio.sleep(2)
sandbox.commands.send_stdin(read_handler.pid, "input after 2 seconds\n")

# 等待命令完成
response = await wait_task


### 后台执行命令，并通过kill结束掉命令

In [None]:
# 使用终端后台执行命令，每个命令都是暂停10秒
sandbox.commands.run("sleep 10",background=True)
sandbox.commands.run("sleep 10",background=True)
# 打印当前正在运行的命令
command_list = sandbox.commands.list()
print(command_list)
# 结束所有正在运行的命令
for command in command_list:
    sandbox.commands.kill(command.pid)
# 打印当前正在运行的命令
command_list = sandbox.commands.list()
print(command_list)

### 连接到正在运行的命令

In [None]:
read_code = """
echo wait for user input
read test
echo $test
"""
# 使用终端后台执行命令2
sandbox.commands.run(read_code,background=True,timeout=5)
# 获取最后一个执行的命令的pid
connect_pid = sandbox.commands.list()[0].pid
# 连接到最后一个执行的命令
connect_handler = sandbox.commands.connect(connect_pid)
# 向正在运行的命令发送输入
sandbox.commands.send_stdin(connect_pid,"test_data\n")
# 打印命令输出(连接之后的输出)
print(connect_handler.wait().stdout)


## 简单数据分析场景演示

### 生成并上传测试数据
本地模拟生成销售数据，并通过文件系统操作上传到沙箱中

In [None]:
import pandas as pd
import numpy as np
import io

# 随机生成5000条销售数据
np.random.seed(42)
data = {
    'product': np.random.choice(['A', 'B', 'C', 'D', 'E'], 5000),
    'sales': np.random.normal(150, 50, 5000).round(2),
    'region': np.random.choice(['North', 'South', 'East', 'West'], 5000, p=[0.4, 0.3, 0.2, 0.1]),
    'date': pd.date_range('2024-01-01', periods=5000, freq='h')
}
df = pd.DataFrame(data)

# 上传到沙箱
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
sandbox.files.write("sales_data.csv", csv_buffer.getvalue())



### 在沙箱中执行代码分析数据
模拟大模型生成代码，在沙箱中对数据进行分析

In [None]:
analysis_code = """
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv('sales_data.csv')
print(f"数据量: {len(df)} 条")

# 统计分析
product_stats = df.groupby('product')['sales'].agg(['count', 'sum', 'mean']).round(2)
print("\\n产品销售统计:")
print(product_stats)

# 生成图表
plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
product_stats['sum'].plot(kind='bar')
plt.title('各产品总销售额')

plt.subplot(1, 3, 2)
df.groupby('region')['sales'].sum().plot(kind='pie', autopct='%1.1f%%')
plt.title('各地区销售占比')

plt.subplot(1, 3, 3)
df['sales'].hist(bins=30, alpha=0.7)
plt.title('销售额分布')
plt.xlabel('销售额')

plt.tight_layout()
plt.savefig('analysis_result.png', dpi=100, bbox_inches='tight')
"""

result = sandbox.run_code(analysis_code)
print(result.logs.stdout[0])


### 读取可视化分析结果

In [None]:
from IPython.display import Image, display

image_data = sandbox.files.read("analysis_result.png", format="bytes")
display(Image(image_data))

### 销毁沙箱
沙箱中的临时文件会自动销毁

In [None]:
sandbox.kill()