# CM (Collaborative Moving) 环境使用教程

本教程将介绍如何使用CM环境进行多智能体强化学习实验。

## 目录
1. [环境简介](#环境简介)
2. [基础使用](#基础使用)
3. [CTDE兼容性](#ctde兼容性)
4. [可视化功能](#可视化功能)
5. [配置管理](#配置管理)
6. [性能测试](#性能测试)
7. [高级功能](#高级功能)

## 环境简介

CM是一个协作推箱子环境，智能体需要协作将箱子推到目标位置：
- **协作机制**：箱子需要多个智能体从不同方向推动才能成功移动
- **成功概率**：推动成功的概率随协作智能体数量增加而提高
- **团队奖励**：所有智能体共享团队奖励，鼓励协作

环境的目标是测试多智能体协作算法的收敛性和性能。

# CM环境详解：观测空间、动作空间与奖励机制

## 1. 观测空间详细解析

观测空间定义在 `env_cm.py` 的 `_get_observations` 方法中，维度为 `6 + 2*(n_agents-1)`，结构如下：

### 1.1 自身位置信息（2维）

| 维度 | 特征 | 描述 |
|------|------|------|
| 2 | 位置信息 | `position.x`, `position.y` - 智能体的网格坐标 |

### 1.2 箱子中心位置（2维）

| 维度 | 特征 | 描述 |
|------|------|------|
| 2 | 箱子中心 | `box_center_x`, `box_center_y` - 箱子的中心坐标 |

### 1.3 目标中心位置（2维）

| 维度 | 特征 | 描述 |
|------|------|------|
| 2 | 目标中心 | `goal_center_x`, `goal_center_y` - 目标区域的中心坐标 |

### 1.4 其他智能体相对位置（2*(n_agents-1)维）

对于每个其他智能体，提供2维相对位置信息：

| 维度 | 特征 | 描述 |
|------|------|------|
| 2 | 相对位置 | `other_agent.x - self.x`, `other_agent.y - self.y` |

**观测空间总维度计算**：
```
obs_dim = 6 (自身+箱子+目标) + 2 * (n_agents - 1) (其他智能体)
```

例如：
- 2个智能体：6 + 2 = 8维
- 3个智能体：6 + 4 = 10维
- 4个智能体：6 + 6 = 12维

### 1.5 观测归一化

如果配置了 `normalize_observations=True`，所有观测值会除以网格大小进行归一化到[0,1]范围。

**设计优势**：
- 简洁明了：只包含必要的位置信息
- 协作导向：包含其他智能体位置便于协作
- 易于学习：低维观测空间便于算法收敛

## 2. 动作空间（5维）详细解析

动作空间定义在 `core.py` 的 `ActionType` 枚举中，包含5个离散动作：

| 动作ID | 名称 | 描述 |
|--------|------|------|
| 0 | STAY | 原地等待 |
| 1 | MOVE_UP | 向上移动 |
| 2 | MOVE_DOWN | 向下移动 |
| 3 | MOVE_LEFT | 向左移动 |
| 4 | MOVE_RIGHT | 向右移动 |

### 2.1 动作约束

- **边界约束**：智能体不能移出网格边界
- **箱子碰撞**：智能体不能移动到箱子内部
- **智能体碰撞**：多个智能体不能占据同一位置

### 2.2 动作掩码

环境提供 `get_avail_actions()` 方法返回当前可用的动作列表，考虑边界约束。

**设计特点**：
- 简单直观：5个基本动作易于理解和学习
- 安全约束：动作掩码确保无效动作不会被执行
- 协作友好：移动动作便于智能体协调位置

## 3. 奖励机制详细解析

奖励系统设计在 `env_cm.py` 的 `_calculate_rewards` 方法中，采用团队奖励机制。

### 3.1 基础奖励组成

#### 1. 时间惩罚
```
time_penalty = -0.01 (默认值)
```
- 每步都会扣除，鼓励智能体快速完成任务
- 不同难度配置有不同的时间惩罚值

#### 2. 碰撞惩罚
```
agent_collision_penalty = -0.1
box_collision_penalty = -0.05
```
- 智能体之间的碰撞惩罚较重
- 撞墙（边界碰撞）惩罚较轻

#### 3. 协作奖励
```
cooperation_reward * len(pushing_sides)
```
- 当多个智能体同时推动箱子时给予奖励
- 推动边数越多，奖励越高
- 鼓励多角度协作推动

#### 4. 距离奖励
```
-distance * distance_reward_scale * 0.01
```
- 箱子与目标的欧几里得距离
- 距离越近，惩罚越小（即奖励越大）
- 引导智能体向目标移动

#### 5. 箱子移动奖励
```
box_move_reward_scale = 0.5
```
- 箱子成功移动时给予的即时奖励
- 为成功的协作行为提供正向反馈

#### 6. 目标达成奖励
```
goal_reached_reward = 10.0 (默认值)
```
- 箱子到达目标位置时的大额奖励
- 提供明确的目标导向信号

### 3.2 推动成功概率

箱子推动的成功概率取决于参与推动的智能体数量：

| 推动智能体数 | 成功概率（普通） | 成功概率（简单） | 成功概率（困难） |
|-------------|-----------------|-----------------|-----------------|
| 1个智能体 | 50% | 70% | 30% |
| 2个智能体 | 75% | 90% | 60% |
| 3个智能体 | 90% | 100% | 85% |
| 4个智能体 | 100% | 100% | 100% |

### 3.3 团队奖励机制

所有智能体获得相同的奖励值：
```
rewards = {agent_id: total_reward for agent_id in agent_ids}
```

**设计理念**：
- **强调协作**：个人成功不如团队成功重要
- **公平分配**：避免竞争行为，鼓励合作
- **共同目标**：所有智能体为同一目标努力

## 4. 推动机制详解

### 4.1 推动条件

智能体必须位于箱子的侧面才能推动：
- **上侧**：箱子上方相邻位置
- **下侧**：箱子下方相邻位置
- **左侧**：箱子左侧相邻位置
- **右侧**：箱子右侧相邻位置

### 4.2 推动方向计算

推动方向由推动的侧面决定：
- 只有上方推动 → 向上推
- 只有下方推动 → 向下推
- 只有左侧推动 → 向左推
- 只有右侧推动 → 向右推
- 相对侧面推动 → 抵消，不移动

### 4.3 协作推动策略

最有效的推动策略：
1. **双智能体协作**：两个智能体从相对方向推动
2. **多智能体协作**：三个或四个智能体从不同角度推动
3. **位置协调**：智能体需要协调到合适的推动位置

## 5. 配置参数对奖励的影响

### 5.1 难度级别差异

| 参数 | 简单模式 | 普通模式 | 困难模式 |
|------|----------|----------|----------|
| 时间惩罚 | -0.005 | -0.01 | -0.015 |
| 协作奖励 | 0.03 | 0.02 | 0.015 |
| 目标奖励 | 15.0 | 10.0 | 8.0 |
| 距离奖励系数 | 0.8 | 0.5 | 0.3 |

### 5.2 特殊配置

- **合作测试配置**：大幅提高协作收益，强制多智能体合作
- **单智能体配置**：移除协作奖励，测试单智能体性能
- **多智能体配置**：增加碰撞惩罚，测试更多智能体的协调

## 6. 总结

CM环境是一个典型的多智能体协作环境，具有以下特点：

- **低维观测空间**：6 + 2*(n_agents-1)维，信息简洁有效
- **简单动作空间**：5个离散动作，易于学习
- **精心设计的奖励机制**：多层次奖励引导智能体学习协作策略
- **概率性推动机制**：增加环境挑战性和现实性

**环境优势**：
1. **协作需求明确**：单智能体几乎无法完成任务
2. **算法友好**：简单的状态和动作空间便于算法收敛
3. **可配置性强**：多种难度和专用配置
4. **评估直观**：任务成功与否一目了然

这个环境为测试多智能体协作算法提供了一个理想的平台，特别适合验证QMIX、VDN等强调团队协作的方法。

In [2]:
# 导入必要的库
import numpy as np
import matplotlib.pyplot as plt
import sys
import os

# 添加环境路径
import sys
sys.path.append("..")  # 将上级目录添加到系统路径

# 导入CM环境
from Env.CM import create_cm_env, create_cm_ctde_env, create_cm_env_from_config
from Env.CM.config import get_config_by_name
from Env.CM.renderer import MatplotlibRenderer

print("CM环境导入成功！")

CM环境导入成功！


## 基础使用

In [3]:
# 创建基础CM环境
env = create_cm_env(difficulty="normal", render_mode="")

print(f"环境信息:")
print(f"- 智能体数量: {env.n_agents}")
print(f"- 智能体ID: {env.agent_ids}")
print(f"- 网格大小: {env.config.grid_size}x{env.config.grid_size}")
print(f"- 最大步数: {env.config.max_steps}")
print(f"- 箱子大小: {env.config.box_size}x{env.config.box_size}")
print(f"- 观察空间维度: {env.observation_space.shape[0]}")
print(f"- 动作空间维度: {env.n_actions}")

环境信息:
- 智能体数量: 2
- 智能体ID: ['agent_0', 'agent_1']
- 网格大小: 7x7
- 最大步数: 100
- 箱子大小: 2x2
- 观察空间维度: 8
- 动作空间维度: 5


In [4]:
observations = env.reset()

print("初始观察信息:")
for agent_id, obs in observations.items():
    print(f"- {agent_id}: 观察形状={obs.shape}, 数据类型={obs.dtype}")
    print(f"  最小值={obs.min():.3f}, 最大值={obs.max():.3f}")
    break  # 只显示第一个智能体的信息

初始观察信息:
- agent_0: 观察形状=(8,), 数据类型=float32
  最小值=-0.143, 最大值=0.643


In [5]:
# 执行一个简单的回合
total_rewards = {agent_id: 0 for agent_id in env.agent_ids}
step_count = 0

print("开始执行回合...")

while step_count < 50:  # 限制步数用于演示
    # 随机选择动作
    actions = {}
    for agent_id in env.agent_ids:
        # 获取可用动作
        avail_actions = env.get_avail_actions(agent_id)
        action = np.random.choice(avail_actions)
        actions[agent_id] = action
    
    # 执行动作
    observations, rewards, done, info = env.step(actions)
    
    # 累积奖励
    for agent_id, reward in rewards.items():
        total_rewards[agent_id] += reward
    
    step_count += 1
    
    # 打印进度
    if step_count % 10 == 0:
        avg_reward = np.mean(list(rewards.values()))
        print(f"步数 {step_count}: 平均奖励 = {avg_reward:.3f}, 距离目标 = {info['distance_to_goal']:.2f}")
    
    # 检查是否结束
    for agent_id, done_flag in done.items():
        if done_flag:
            print(f"- {agent_id} 已完成")
            break
        

print("\n回合结果:")
for agent_id, total_reward in total_rewards.items():
    print(f"- {agent_id}: 总奖励 = {total_reward:.3f}")
print(f"- 团队平均奖励: {np.mean(list(total_rewards.values())):.3f}")
print(f"- 最终距离目标: {info['distance_to_goal']:.2f}")
print(f"- 目标达成: {info['agents_complete']}")

开始执行回合...
步数 10: 平均奖励 = -0.026, 距离目标 = 3.16
步数 20: 平均奖励 = -0.030, 距离目标 = 4.00
步数 30: 平均奖励 = -0.031, 距离目标 = 4.12
步数 40: 平均奖励 = 0.469, 距离目标 = 4.12
步数 50: 平均奖励 = -0.030, 距离目标 = 4.00

回合结果:
- agent_0: 总奖励 = 5.499
- agent_1: 总奖励 = 5.499
- 团队平均奖励: 5.499
- 最终距离目标: 4.00
- 目标达成: False


In [6]:
# 关闭环境
env.close()
print("环境已关闭")

环境已关闭


## CTDE兼容性

CM环境提供了CTDE（集中式训练分布式执行）包装器，支持QMIX、VDN等算法。

In [7]:
# 测试不同类型的全局状态表示
global_state_types = ["concat", "mean", "max", "attention"]

for state_type in global_state_types:
    print(f"\n测试全局状态类型: {state_type}")
    
    # 创建CTDE环境
    ctde_env = create_cm_ctde_env(
        "easy_ctde",
        global_state_type=state_type
    )
    
    # 重置环境
    observations = ctde_env.reset()
    
    # 执行一步并检查info中的全局状态
    actions = {agent_id: 0 for agent_id in ctde_env.agent_ids}
    obs, rewards, dones, infos = ctde_env.step(actions)
    
    if 'global_state' in infos:
        info_state = infos['global_state']
        print(f"  - Info中全局状态形状: {info_state.shape}")
    
    ctde_env.close()


测试全局状态类型: concat
  - Info中全局状态形状: (16,)

测试全局状态类型: mean
  - Info中全局状态形状: (8,)

测试全局状态类型: max
  - Info中全局状态形状: (8,)

测试全局状态类型: attention
  - Info中全局状态形状: (24,)


In [8]:
# 获取CTDE环境信息
ctde_env = create_cm_ctde_env("normal_ctde")
env_info = ctde_env.get_env_info()

print("CTDE环境信息:")
for key, value in env_info.items():
    print(f"- {key}: {value}")

ctde_env.close()

CTDE环境信息:
- n_agents: 2
- agent_ids: ['agent_0', 'agent_1']
- n_actions: 5
- obs_dims: {'agent_0': 8, 'agent_1': 8}
- act_dims: {'agent_0': 5, 'agent_1': 5}
- episode_limit: 100
- grid_size: 7
- box_size: 2
- global_state_dim: 16
- global_state_type: concat


## 可视化功能

CM环境提供了多种可视化选项，包括实时可视化和静态图表。

In [11]:
# 创建环境并生成静态可视化
env = create_cm_env(difficulty="normal", render_mode="rgb_array")
observations = env.reset()

# 渲染环境
rgb_array = env.render()
if rgb_array is not None:
    plt.figure(figsize=(8, 8))
    plt.imshow(rgb_array)
    plt.title(f"CM环境初始状态")
    plt.axis('off')
    plt.show()
else:
    print("无法生成图像，可能是matplotlib不可用")

env.close()

  plt.show()


In [13]:
# 动态演示：执行几步并可视化
env = create_cm_env(difficulty="easy", render_mode="rgb_array")
observations = env.reset()

print("生成环境状态快照...")

fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('CM环境状态演化', fontsize=16)

for step in range(6):
    # 渲染当前状态
    rgb_array = env.render()
    
    if rgb_array is not None:
        ax = axes[step // 3, step % 3]
        ax.imshow(rgb_array)
        ax.set_title(f'步骤 {step}: ')
        ax.axis('off')
    
    # 执行随机动作
    if step < 5:  # 不在最后一步执行
        actions = {agent_id: np.random.choice(env.get_avail_actions(agent_id)) 
                  for agent_id in env.agent_ids}
        observations, rewards, done, info = env.step(actions)
        
        for agent_id, reward in rewards.items():
            print(f'{agent_id} 获得奖励 {reward}')
        
        for agent_id, done_ in done.items():
            if done_:
                print(f'{agent_id}  Episode done')

plt.tight_layout()
plt.show()

env.close()

生成环境状态快照...
agent_0 获得奖励 -0.02288854381999832
agent_1 获得奖励 -0.02288854381999832
agent_0 获得奖励 -0.02288854381999832
agent_1 获得奖励 -0.02288854381999832
agent_0 获得奖励 -0.02288854381999832
agent_1 获得奖励 -0.02288854381999832
agent_0 获得奖励 -0.02288854381999832
agent_1 获得奖励 -0.02288854381999832
agent_0 获得奖励 0.479
agent_1 获得奖励 0.479


  plt.show()


## 配置管理

CM环境提供了丰富的配置选项，支持不同的实验设置。

In [14]:
# 测试不同难度配置
difficulties = ["easy", "normal", "hard"]

for difficulty in difficulties:
    print(f"\n难度配置: {difficulty}")
    
    env = create_cm_env(difficulty=difficulty)
    
    print(f"  - 网格大小: {env.config.grid_size}x{env.config.grid_size}")
    print(f"  - 智能体数量: {env.config.n_agents}")
    print(f"  - 最大步数: {env.config.max_steps}")
    print(f"  - 箱子大小: {env.config.box_size}x{env.config.box_size}")
    print(f"  - 时间惩罚: {env.config.time_penalty}")
    print(f"  - 目标奖励: {env.config.goal_reached_reward}")
    print(f"  - 推动成功概率: {env.config.push_success_probs}")
    
    env.close()


难度配置: easy
  - 网格大小: 7x7
  - 智能体数量: 2
  - 最大步数: 100
  - 箱子大小: 2x2
  - 时间惩罚: -0.005
  - 目标奖励: 15.0
  - 推动成功概率: {1: 0.7, 2: 0.9, 3: 1.0, 4: 1.0}

难度配置: normal
  - 网格大小: 7x7
  - 智能体数量: 2
  - 最大步数: 100
  - 箱子大小: 2x2
  - 时间惩罚: -0.01
  - 目标奖励: 10.0
  - 推动成功概率: {1: 0.5, 2: 0.75, 3: 0.9, 4: 1.0}

难度配置: hard
  - 网格大小: 9x9
  - 智能体数量: 3
  - 最大步数: 150
  - 箱子大小: 2x2
  - 时间惩罚: -0.015
  - 目标奖励: 8.0
  - 推动成功概率: {1: 0.3, 2: 0.6, 3: 0.85, 4: 1.0}


In [15]:
# 测试专用配置
from Env.CM.config import list_available_configs

print("所有可用配置:")
configs = list_available_configs()
for config_name in configs:
    print(f"  - {config_name}")

# 测试特殊配置
special_configs = ["cooperation_test", "single_agent", "multi_agent"]

for config_name in special_configs:
    if config_name in configs:
        print(f"\n专用配置: {config_name}")
        
        try:
            config = get_config_by_name(config_name)
            print(f"  - 智能体数量: {config.n_agents}")
            print(f"  - 网格大小: {config.grid_size}x{config.grid_size}")
            print(f"  - 协作奖励: {config.cooperation_reward}")
            print(f"  - 推动成功概率: {config.push_success_probs}")
            
        except Exception as e:
            print(f"  - 错误: {e}")
    else:
        print(f"\n配置 {config_name} 不存在")

所有可用配置:
  - easy
  - normal
  - hard
  - debug
  - cooperation_test
  - single_agent
  - multi_agent
  - easy_ctde
  - normal_ctde
  - hard_ctde

专用配置: cooperation_test
  - 智能体数量: 3
  - 网格大小: 7x7
  - 协作奖励: 0.05
  - 推动成功概率: {1: 0.2, 2: 0.7, 3: 0.95, 4: 1.0}

专用配置: single_agent
  - 智能体数量: 1
  - 网格大小: 5x5
  - 协作奖励: 0.0
  - 推动成功概率: {1: 0.8, 2: 1.0, 3: 1.0, 4: 1.0}

专用配置: multi_agent
  - 智能体数量: 4
  - 网格大小: 9x9
  - 协作奖励: 0.025
  - 推动成功概率: {1: 0.3, 2: 0.6, 3: 0.85, 4: 1.0}


In [16]:
# 自定义配置示例
from Env.CM.config import CMConfig

# 创建自定义配置
custom_config = CMConfig(
    grid_size=8,
    n_agents=3,
    max_steps=120,
    box_size=2,
    goal_size=2,
    push_success_probs={1: 0.4, 2: 0.8, 3: 1.0, 4: 1.0},
    cooperation_reward=0.04,
    time_penalty=-0.008,
    goal_reached_reward=12.0,
    distance_reward_scale=0.6,
    render_mode="rgb_array"
)

print("自定义配置:")
print(f"- 网格大小: {custom_config.grid_size}")
print(f"- 智能体数量: {custom_config.n_agents}")
print(f"- 最大步数: {custom_config.max_steps}")
print(f"- 协作奖励: {custom_config.cooperation_reward}")
print(f"- 推动成功概率: {custom_config.push_success_probs}")

# 创建环境
custom_env = create_cm_env_from_config(custom_config)

print(f"\n自定义环境创建成功！")
print(f"环境信息: {custom_env.get_env_info()}")

custom_env.close()

自定义配置:
- 网格大小: 8
- 智能体数量: 3
- 最大步数: 120
- 协作奖励: 0.04
- 推动成功概率: {1: 0.4, 2: 0.8, 3: 1.0, 4: 1.0}

自定义环境创建成功！
环境信息: {'n_agents': 3, 'agent_ids': ['agent_0', 'agent_1', 'agent_2'], 'n_actions': 5, 'obs_dims': {'agent_0': 10, 'agent_1': 10, 'agent_2': 10}, 'act_dims': {'agent_0': 5, 'agent_1': 5, 'agent_2': 5}, 'episode_limit': 120, 'grid_size': 8, 'box_size': 2}


## 性能测试

评估环境的执行性能，确保适合强化学习训练。

In [22]:
import time

# 性能测试
def benchmark_environment(num_steps=100):
    env = create_cm_env(difficulty="normal")
    observations= env.reset()
    
    start_time = time.time()
    
    total_rewards = []
    step_times = []
    
    for step in range(num_steps):
        step_start = time.time()
        
        # 随机动作
        actions = {agent_id: np.random.choice(env.get_avail_actions(agent_id)) 
                  for agent_id in env.agent_ids}
        
        # 执行步骤
        observations, rewards, done, info = env.step(actions)
        
        step_end = time.time()
        step_times.append(step_end - step_start)
        total_rewards.append(np.mean(list(rewards.values())))
        
        # 如果回合结束，重置
        for agent_id in done:
            if done[agent_id]:
                observations = env.reset()
    
    total_time = time.time() - start_time
    
    env.close()
    
    return {
        'total_time': total_time,
        'avg_step_time': np.mean(step_times),
        'steps_per_second': num_steps / total_time,
        'avg_reward': np.mean(total_rewards),
        'max_step_time': np.max(step_times),
        'min_step_time': np.min(step_times)
    }

# 运行基准测试
results = benchmark_environment(200)

print("性能测试结果:")
print(f"- 总时间: {results['total_time']:.3f}秒")
print(f"- 平均每步时间: {results['avg_step_time']:.4f}秒")
print(f"- 每秒步数: {results['steps_per_second']:.1f}")
print(f"- 最大步时间: {results['max_step_time']:.4f}秒")
print(f"- 最小步时间: {results['min_step_time']:.4f}秒")
print(f"- 平均奖励: {results['avg_reward']:.4f}")

# 性能评估
if results['avg_step_time'] < 0.01:
    print("\n🚀 性能优秀: 适合大规模训练")
elif results['avg_step_time'] < 0.05:
    print("\n✅ 性能良好: 适合常规训练")
elif results['avg_step_time'] < 0.1:
    print("\n⚠️  性能一般: 可以用于训练，但可能较慢")
else:
    print("\n❌ 性能较慢: 建议优化后再进行大规模训练")

性能测试结果:
- 总时间: 0.031秒
- 平均每步时间: 0.0001秒
- 每秒步数: 6378.6
- 最大步时间: 0.0007秒
- 最小步时间: 0.0001秒
- 平均奖励: 0.0624

🚀 性能优秀: 适合大规模训练


## 高级功能

演示环境的高级功能，包括动作掩码、详细观察分析等。

In [18]:
# 动作掩码演示
env = create_cm_env(difficulty="normal")
observations = env.reset()

print("动作掩码演示:")
for agent_id in env.agent_ids:
    avail_actions = env.get_avail_actions(agent_id)
    print(f"\n智能体 {agent_id}:")
    print(f"  - 当前位置: ({env.game_state.get_agent(agent_id).position.x}, {env.game_state.get_agent(agent_id).position.y})")
    print(f"  - 可用动作数量: {len(avail_actions)}")
    print(f"  - 可用动作: {avail_actions}")
    
    # 动作名称映射
    action_names = {
        0: "等待", 1: "上移动", 2: "下移动", 3: "左移动", 4: "右移动"
    }
    
    print(f"  - 可用动作名称: {[action_names[a] for a in avail_actions]}")

env.close()

动作掩码演示:

智能体 agent_0:
  - 当前位置: (1, 2)
  - 可用动作数量: 5
  - 可用动作: [0, 1, 2, 3, 4]
  - 可用动作名称: ['等待', '上移动', '下移动', '左移动', '右移动']

智能体 agent_1:
  - 当前位置: (4, 2)
  - 可用动作数量: 5
  - 可用动作: [0, 1, 2, 3, 4]
  - 可用动作名称: ['等待', '上移动', '下移动', '左移动', '右移动']


In [19]:
# 观察空间详细分析
env = create_cm_env(difficulty="normal")
observations = env.reset()

print("观察空间详细分析:")

# 分析一个智能体的观察
sample_agent_id = env.agent_ids[0]
obs = observations[sample_agent_id]

print(f"\n智能体 {sample_agent_id} 的观察分析:")
print(f"- 观察维度: {obs.shape}")
print(f"- 数据类型: {obs.dtype}")
print(f"- 最小值: {obs.min():.4f}")
print(f"- 最大值: {obs.max():.4f}")
print(f"- 均值: {obs.mean():.4f}")
print(f"- 标准差: {obs.std():.4f}")

# 按观察组成部分分析
print("\n观察组成部分分析:")
print(f"- 自身位置 (0-2): {obs[0:2]}")
print(f"- 箱子中心 (2-4): {obs[2:4]}")
print(f"- 目标中心 (4-6): {obs[4:6]}")

if env.n_agents > 1:
    other_agent_start = 6
    for i, other_id in enumerate(env.agent_ids):
        if other_id != sample_agent_id:
            rel_pos = obs[other_agent_start:other_agent_start+2]
            print(f"- 相对{other_id}位置 ({other_agent_start}-{other_agent_start+2}): {rel_pos}")
            other_agent_start += 2

env.close()

观察空间详细分析:

智能体 agent_0 的观察分析:
- 观察维度: (8,)
- 数据类型: float32
- 最小值: -0.1429
- 最大值: 0.5714
- 均值: 0.2857
- 标准差: 0.2143

观察组成部分分析:
- 自身位置 (0-2): [0.2857143 0.5714286]
- 箱子中心 (2-4): [0.35714287 0.35714287]
- 目标中心 (4-6): [0.07142857 0.5       ]
- 相对agent_1位置 (6-8): [ 0.2857143  -0.14285715]


In [20]:
# 推动机制分析
env = create_cm_env(difficulty="easy")
observations = env.reset()

print("推动机制分析:")

# 分析初始推动条件
box = env.game_state.box
print(f"\n箱子信息:")
print(f"- 位置: ({box.position.x}, {box.position.y})")
print(f"- 大小: {box.size}x{box.size}")
print(f"- 占据位置: {[(p.x, p.y) for p in box.get_occupied_positions()]}")

print(f"\n智能体推动能力分析:")
for agent_id, agent in env.game_state.agents.items():
    is_pushing = agent.is_pushing_box(box)
    push_side = agent.get_push_side(box)
    
    print(f"- {agent_id}:")
    print(f"  - 位置: ({agent.position.x}, {agent.position.y})")
    print(f"  - 可推动: {is_pushing}")
    print(f"  - 推动侧: {push_side}")

pushing_agents = env.game_state.get_pushing_agents()
push_sides = env.game_state.get_push_sides()

print(f"\n当前推动状态:")
print(f"- 推动智能体: {pushing_agents}")
print(f"- 推动侧: {push_sides}")
print(f"- 推动方向: {env.game_state.calculate_push_direction(push_sides)}")

env.close()

推动机制分析:

箱子信息:
- 位置: (2, 2)
- 大小: 2x2
- 占据位置: [(2, 2), (2, 3), (3, 2), (3, 3)]

智能体推动能力分析:
- agent_0:
  - 位置: (2, 4)
  - 可推动: True
  - 推动侧: right
- agent_1:
  - 位置: (1, 2)
  - 可推动: True
  - 推动侧: top

当前推动状态:
- 推动智能体: ['agent_0', 'agent_1']
- 推动侧: ['top', 'right']
- 推动方向: [-1  1]


In [25]:
# 奖励系统分析
env = create_cm_env(difficulty="normal")

print("奖励系统分析:")

# 执行多个步骤收集奖励数据
all_rewards = {agent_id: [] for agent_id in env.agent_ids}
episode_rewards = []
successful_episodes = 0

for episode in range(10):
    observations = env.reset()
    episode_total = 0
    
    for step in range(10):
        actions = {agent_id: np.random.choice(env.get_avail_actions(agent_id)) 
                  for agent_id in env.agent_ids}
        observations, rewards, done, info = env.step(actions)
        
        for agent_id, reward in rewards.items():
            all_rewards[agent_id].append(reward)
            episode_total += reward
        
        for agent_id, done_ in done.items():
            if done_:
                successful_episodes += 1
    
    episode_rewards.append(episode_total)
    print(f"回合 {episode + 1}: 总奖励 = {episode_total:.3f}, 成功 = {done}, 距离 = {info['distance_to_goal']:.2f}")

print("\n奖励统计分析:")
for agent_id, rewards in all_rewards.items():
    if rewards:
        print(f"\n{agent_id}:")
        print(f"  - 平均奖励: {np.mean(rewards):.4f}")
        print(f"  - 标准差: {np.std(rewards):.4f}")
        print(f"  - 最大奖励: {np.max(rewards):.4f}")
        print(f"  - 最小奖励: {np.min(rewards):.4f}")
        print(f"  - 正奖励比例: {np.mean(np.array(rewards) > 0):.2%}")

print(f"\n团队统计:")
print(f"- 平均回合奖励: {np.mean(episode_rewards):.3f} ± {np.std(episode_rewards):.3f}")
print(f"- 成功回合数: {successful_episodes}/10 ({successful_episodes/10:.1%})")

env.close()

奖励系统分析:
回合 1: 总奖励 = 2.460, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 4.47
回合 2: 总奖励 = 4.431, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 3.16
回合 3: 总奖励 = 2.639, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 1.41
回合 4: 总奖励 = 1.671, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 1.00
回合 5: 总奖励 = 1.323, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 3.00
回合 6: 总奖励 = 1.557, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 3.00
回合 7: 总奖励 = -0.483, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 2.83
回合 8: 总奖励 = 1.003, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 1.41
回合 9: 总奖励 = -0.624, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 2.24
回合 10: 总奖励 = 4.266, 成功 = {'agent_0': False, 'agent_1': False}, 距离 = 2.00

奖励统计分析:

agent_0:
  - 平均奖励: 0.0912
  - 标准差: 0.2167
  - 最大奖励: 0.4850
  - 最小奖励: -0.1250
  - 正奖励比例: 24.00%

agent_1:
  - 平均奖励: 0.0912
  - 标准差: 0.2167
  - 最大奖励: 0.4850
  - 最小奖励: -0.1250
  - 正奖励比例: 24.00%

团队统计:
- 平均回合奖励: 1.824 ± 1.617
- 成功回合数: 0/10 (0.0%)


In [26]:
env.close()

## 总结

本教程介绍了CM环境的主要功能和使用方法：

1. **基础环境创建和使用**：展示了如何创建环境、重置、执行步骤
2. **CTDE兼容性**：演示了集中式训练分布式执行的支持
3. **可视化功能**：展示了静态和动态可视化能力
4. **配置管理**：介绍了不同难度和专用配置
5. **性能测试**：评估了环境执行效率
6. **高级功能**：演示了动作掩码、观察分析等功能
7. **完整示例**：提供了简单智能体的实现

### 关键特点：
- ✅ **协作需求**：多个智能体必须协作才能成功推动箱子
- ✅ **简单直观**：低维观测和动作空间，易于理解和学习
- ✅ **CTDE兼容**：支持主流多智能体强化学习算法
- ✅ **高性能**：单步执行时间<0.01秒，适合大规模训练
- ✅ **可视化**：丰富的可视化功能便于分析和调试
- ✅ **可配置**：多种预设和自定义配置支持
- ✅ **概率机制**：推动成功概率机制增加挑战性

### 环境优势：
1. **算法测试友好**：简单的状态空间便于算法快速收敛
2. **协作机制明确**：智能体必须协作才能获得高奖励
3. **难度可调**：从简单的2智能体到复杂的4智能体配置
4. **评估直观**：任务成功与否一目了然
5. **研究价值高**：适合测试协作算法的各个方面

CM环境现在可以用于多智能体强化学习的研究和实验了！它特别适合：
- 测试新的协作算法
- 验证现有算法的协作能力
- 教学和演示多智能体系统
- 算法性能基准测试