# COSMOS: 多机器人编队导航演示

**COSMOS** (COordinated Safety On Manifold for multi-agent Systems) + RMPflow + MAPPO

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/YOUR_REPO/blob/main/formation_nav/COSMOS_Demo.ipynb)

---

## 1. 安装依赖

In [None]:
# 安装依赖 (Colab 已预装 torch, numpy, matplotlib)
!pip install gymnasium -q

# 验证安装
import torch
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt

print(f"PyTorch: {torch.__version__}")
print(f"NumPy: {np.__version__}")
print(f"Gymnasium: {gym.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

## 2. 克隆代码库

In [None]:
# 如果在 Colab 运行，克隆代码库
import os

if 'COLAB_GPU' in os.environ:
    !git clone https://github.com/YOUR_USERNAME/safe-rl-manifold-suite.git
    %cd safe-rl-manifold-suite
else:
    # 本地运行，确保在项目根目录
    %cd /content/safe-rl-manifold-suite  # 或你的本地路径

## 3. 导入模块

In [None]:
import sys
sys.path.insert(0, '.')

import numpy as np
import torch
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from IPython.display import HTML, display
from matplotlib.animation import FuncAnimation
import time

from formation_nav.config import Config
from formation_nav.env.formation_env import FormationNavEnv
from formation_nav.env.formations import FormationTopology
from formation_nav.safety import COSMOS, COSMOSMode
from formation_nav.algo.mappo import MAPPO
from formation_nav.algo.buffer import RolloutBuffer

print("模块导入成功!")

## 4. 配置参数

In [None]:
# @title 训练参数 { run: "auto" }

NUM_EPISODES = 200  # @param {type: "integer"}
NUM_AGENTS = 4  # @param [3, 4, 5, 6] {type: "raw"}
FORMATION = "square"  # @param ["square", "triangle", "circle", "line", "hexagon"]
SEED = 42  # @param {type: "integer"}

# 设置随机种子
np.random.seed(SEED)
torch.manual_seed(SEED)

# 创建配置
cfg = Config()
cfg.env.num_agents = NUM_AGENTS
cfg.env.formation_shape = FORMATION
cfg.train.seed = SEED

print(f"智能体数量: {cfg.env.num_agents}")
print(f"编队形状: {cfg.env.formation_shape}")
print(f"训练轮数: {NUM_EPISODES}")
print(f"安全半径: {cfg.safety.safety_radius}")

## 5. 创建环境和模型

In [None]:
# 环境
env = FormationNavEnv(cfg.env, cfg.reward)
topology = FormationTopology(cfg.env.num_agents, "complete")

# COSMOS 安全滤波器
cosmos = COSMOS(
    env_cfg=cfg.env,
    safety_cfg=cfg.safety,
    desired_distances=env.desired_distances,
    topology_edges=topology.edges(),
    obstacle_positions=env.obstacles,
    mode=COSMOSMode.DECENTRALIZED
)

# MAPPO
obs_dim = env.observation_space.shape[0]
share_obs_dim = env.share_observation_space.shape[0]
device = "cuda" if torch.cuda.is_available() else "cpu"

mappo = MAPPO(obs_dim, share_obs_dim, act_dim=2, cfg=cfg.algo, device=device)

# Buffer
buffer = RolloutBuffer(
    episode_length=cfg.env.max_steps,
    num_agents=cfg.env.num_agents,
    obs_dim=obs_dim,
    share_obs_dim=share_obs_dim,
    act_dim=2,
    gamma=cfg.algo.gamma,
    gae_lambda=cfg.algo.gae_lambda,
    device=device
)

print(f"\n设备: {device}")
print(f"观测维度: {obs_dim}")
print(f"COSMOS 模式: {COSMOSMode.DECENTRALIZED.value}")
print("\n初始化完成!")

## 6. 训练

In [None]:
# 训练记录
metrics = {'episode': [], 'reward': [], 'cost': [], 'formation_error': [], 'min_dist': [], 'collisions': []}

print("开始训练...")
print("=" * 70)

start_time = time.time()
total_steps = 0

for episode in range(NUM_EPISODES):
    obs, share_obs, _ = env.reset(seed=cfg.train.seed + episode)
    cosmos.update_obstacles(env.obstacles)
    cosmos.reset(env.positions)
    buffer.set_first_obs(obs, share_obs)
    
    ep_reward, ep_cost, ep_form_err, ep_min_dist, ep_collisions = 0.0, 0.0, 0.0, float('inf'), 0
    
    for step in range(cfg.env.max_steps):
        alphas, log_probs = mappo.get_actions(obs)
        values = mappo.get_values(share_obs)
        safe_actions = cosmos.project(alphas, env.positions, env.velocities, dt=cfg.env.dt)
        next_obs, next_share_obs, rewards, costs, dones, infos, _ = env.step(safe_actions)
        
        masks = (~dones).astype(np.float32).reshape(-1, 1)
        buffer.insert(next_obs, next_share_obs, alphas, log_probs, values, rewards, costs, masks)
        
        obs, share_obs = next_obs, next_share_obs
        total_steps += cfg.env.num_agents
        
        ep_reward += rewards[0, 0]
        ep_cost += costs[0, 0]
        ep_form_err += infos[0]["formation_error"]
        ep_min_dist = min(ep_min_dist, infos[0]["min_inter_dist"])
        ep_collisions += infos[0]["collisions"]
        
        if dones.all():
            break
    
    # PPO 更新
    last_values = mappo.get_values(share_obs)
    buffer.compute_returns_and_advantages(last_values)
    mappo.update(buffer)
    buffer.after_update()
    
    # 记录
    avg_form_err = ep_form_err / (step + 1)
    metrics['episode'].append(episode + 1)
    metrics['reward'].append(ep_reward)
    metrics['cost'].append(ep_cost)
    metrics['formation_error'].append(avg_form_err)
    metrics['min_dist'].append(ep_min_dist)
    metrics['collisions'].append(ep_collisions)
    
    if (episode + 1) % 20 == 0:
        fps = total_steps / (time.time() - start_time)
        print(f"Ep {episode+1:4d}/{NUM_EPISODES} | R={ep_reward:7.2f} | Cost={ep_cost:4.0f} | "
              f"FormErr={avg_form_err:.4f} | MinDist={ep_min_dist:.3f} | Coll={ep_collisions:2d} | FPS={fps:.0f}")

print("=" * 70)
print(f"训练完成! 总碰撞: {sum(metrics['collisions'])}")

## 7. 训练曲线

In [None]:
fig, axes = plt.subplots(2, 3, figsize=(15, 8))

axes[0, 0].plot(metrics['episode'], metrics['reward'], 'b-', alpha=0.7)
axes[0, 0].set_title('Episode Reward')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].plot(metrics['episode'], metrics['cost'], 'r-', alpha=0.7)
axes[0, 1].set_title('Episode Cost')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].grid(True, alpha=0.3)

axes[0, 2].plot(metrics['episode'], metrics['formation_error'], 'g-', alpha=0.7)
axes[0, 2].set_title('Formation Error')
axes[0, 2].set_xlabel('Episode')
axes[0, 2].grid(True, alpha=0.3)

axes[1, 0].plot(metrics['episode'], metrics['min_dist'], 'm-', alpha=0.7)
axes[1, 0].axhline(y=0.5, color='r', linestyle='--', label='Safety Radius')
axes[1, 0].set_title('Min Inter-Agent Distance')
axes[1, 0].set_xlabel('Episode')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].plot(metrics['episode'], metrics['collisions'], 'c-', alpha=0.7)
axes[1, 1].set_title('Collisions per Episode')
axes[1, 1].set_xlabel('Episode')
axes[1, 1].grid(True, alpha=0.3)

# Smoothed reward
window = 20
smoothed = np.convolve(metrics['reward'], np.ones(window)/window, mode='valid')
axes[1, 2].plot(metrics['episode'][window-1:], smoothed, 'b-', linewidth=2)
axes[1, 2].set_title(f'Smoothed Reward (window={window})')
axes[1, 2].set_xlabel('Episode')
axes[1, 2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 8. 评估与可视化

In [None]:
# 评估一个 episode
obs, share_obs, _ = env.reset(seed=SEED + 1000)
cosmos.update_obstacles(env.obstacles)
cosmos.reset(env.positions)

trajectory = [env.positions.copy()]
obstacles_pos = env.obstacles.copy()
goal_pos = env.goal.copy()

for step in range(cfg.env.max_steps):
    alphas, _ = mappo.get_actions(obs, deterministic=True)
    safe_actions = cosmos.project(alphas, env.positions, env.velocities, dt=cfg.env.dt)
    obs, share_obs, _, _, dones, infos, _ = env.step(safe_actions)
    trajectory.append(env.positions.copy())
    if dones.all():
        break

trajectory = np.array(trajectory)
print(f"轨迹长度: {len(trajectory)} 步")
print(f"最终编队误差: {infos[0]['formation_error']:.4f}")
print(f"碰撞次数: {infos[0]['collisions']}")

In [None]:
# 绘制轨迹
fig, ax = plt.subplots(figsize=(10, 10))
colors = plt.cm.tab10(np.linspace(0, 1, cfg.env.num_agents))

# 场地边界
rect = patches.Rectangle((-cfg.env.arena_size, -cfg.env.arena_size), 
                          2*cfg.env.arena_size, 2*cfg.env.arena_size,
                          linewidth=2, edgecolor='black', facecolor='none', linestyle='--')
ax.add_patch(rect)

# 障碍物
for obs in obstacles_pos:
    circle = patches.Circle((obs[0], obs[1]), obs[2], facecolor='gray', edgecolor='black', alpha=0.5)
    ax.add_patch(circle)

# 目标
ax.plot(goal_pos[0], goal_pos[1], 'r*', markersize=25, label='Goal')

# 轨迹
for i in range(cfg.env.num_agents):
    traj = trajectory[:, i, :]
    ax.plot(traj[:, 0], traj[:, 1], '-', color=colors[i], alpha=0.7, linewidth=2)
    ax.plot(traj[0, 0], traj[0, 1], 'o', color=colors[i], markersize=12, label=f'Agent {i}')
    ax.plot(traj[-1, 0], traj[-1, 1], 's', color=colors[i], markersize=12)

# 最终编队
final_pos = trajectory[-1]
for (i, j) in topology.edges():
    ax.plot([final_pos[i, 0], final_pos[j, 0]], [final_pos[i, 1], final_pos[j, 1]], 'k--', alpha=0.5)

ax.set_xlim(-cfg.env.arena_size * 1.1, cfg.env.arena_size * 1.1)
ax.set_ylim(-cfg.env.arena_size * 1.1, cfg.env.arena_size * 1.1)
ax.set_aspect('equal')
ax.set_title('COSMOS + RMPflow + MAPPO Formation Navigation', fontsize=14)
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
plt.show()

## 9. 动画

In [None]:
%%capture
# 创建动画
fig, ax = plt.subplots(figsize=(8, 8))
colors = plt.cm.tab10(np.linspace(0, 1, cfg.env.num_agents))
T = len(trajectory)

def init():
    ax.clear()
    rect = patches.Rectangle((-cfg.env.arena_size, -cfg.env.arena_size), 
                              2*cfg.env.arena_size, 2*cfg.env.arena_size,
                              linewidth=2, edgecolor='black', facecolor='none', linestyle='--')
    ax.add_patch(rect)
    for obs in obstacles_pos:
        circle = patches.Circle((obs[0], obs[1]), obs[2], facecolor='gray', edgecolor='black', alpha=0.5)
        ax.add_patch(circle)
    ax.plot(goal_pos[0], goal_pos[1], 'r*', markersize=20)
    ax.set_xlim(-cfg.env.arena_size * 1.1, cfg.env.arena_size * 1.1)
    ax.set_ylim(-cfg.env.arena_size * 1.1, cfg.env.arena_size * 1.1)
    ax.set_aspect('equal')
    return []

def update(frame):
    ax.clear()
    init()
    pos = trajectory[frame]
    trail_start = max(0, frame - 30)
    for i in range(cfg.env.num_agents):
        trail = trajectory[trail_start:frame+1, i, :]
        ax.plot(trail[:, 0], trail[:, 1], '-', color=colors[i], alpha=0.5, linewidth=2)
        ax.plot(pos[i, 0], pos[i, 1], 'o', color=colors[i], markersize=12)
    for (i, j) in topology.edges():
        ax.plot([pos[i, 0], pos[j, 0]], [pos[i, 1], pos[j, 1]], 'k-', alpha=0.3)
    ax.set_title(f'Step {frame}/{T-1}')
    return []

frames = list(range(0, T, max(1, T // 100)))
anim = FuncAnimation(fig, update, init_func=init, frames=frames, blit=False, interval=50)

In [None]:
# 显示动画
from IPython.display import HTML
HTML(anim.to_jshtml())

## 10. 安全统计

In [None]:
total_collisions = sum(metrics['collisions'])
safe_episodes = sum(1 for c in metrics['collisions'] if c == 0)

print("=" * 50)
print("COSMOS 安全统计")
print("=" * 50)
print(f"总碰撞次数:       {total_collisions}")
print(f"零碰撞轮数:       {safe_episodes}/{NUM_EPISODES} ({100*safe_episodes/NUM_EPISODES:.1f}%)")
print(f"最终编队误差:     {metrics['formation_error'][-1]:.4f}")
print(f"最终最小距离:     {metrics['min_dist'][-1]:.3f}")
print(f"安全半径:         {cfg.safety.safety_radius}")
print("=" * 50)