In [13]:
import numpy as np
from gym import spaces
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import ray
from ray import tune
from ray.rllib.algorithms.ppo import PPO
from ray.tune.registry import register_env
from ray.tune.registry import _global_registry, ENV_CREATOR
ray.shutdown() 

In [14]:
import numpy as np
from gym import spaces
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import ray
from ray import tune
from ray.rllib.algorithms.ppo import PPO
from ray.tune.registry import register_env
from ray.tune.registry import _global_registry, ENV_CREATOR
import os
from ray.rllib.algorithms.callbacks import DefaultCallbacks

# 定义环境类
class MAEnvironment(MultiAgentEnv):
    def __init__(self, num_agents=5, num_iterations=200, dt=0.1):
        super().__init__()  # 调用父类初始化
        self.num_agents = num_agents
        self.agents = ["agent_" + str(i) for i in range(num_agents)]
        self.agent_name_mapping = dict(zip(self.agents, list(range(num_agents))))
        self._agent_ids = set(self.agents)  # 添加 _agent_ids 属性

        # 初始化其他属性
        self.num_iterations = num_iterations
        self.dt = dt
        self.current_iteration = 0

        initial_positions = [0.55, 0.4, -0.05, -0.1, -0.7]
        self.agent_objs = [self.Agent(pos, i) for i, pos in enumerate(initial_positions)]
        self.init_neighbors()

        self.epsilon = 0.005
        self.time_to_reach_epsilon = None
        self.epsilon_violated = True
        self.all_within_epsilon = False
        self.total_trigger_count = 0
        self.time_to_reach_epsilon_changes = 0
        self.max_obs_size = self.compute_max_obs_size()
    
    def compute_max_obs_size(self):
        max_neighbors = max(len(agent.neighbors) for agent in self.agent_objs)
        return 1 + max_neighbors
    
    def init_neighbors(self):
        self.agent_objs[0].add_neighbor(self.agent_objs[1])
        self.agent_objs[0].add_neighbor(self.agent_objs[2])
        self.agent_objs[0].add_neighbor(self.agent_objs[3])
        self.agent_objs[0].add_neighbor(self.agent_objs[4])
        self.agent_objs[1].add_neighbor(self.agent_objs[2])
        self.agent_objs[1].add_neighbor(self.agent_objs[3])
        self.agent_objs[1].add_neighbor(self.agent_objs[4])
        self.agent_objs[2].add_neighbor(self.agent_objs[3])
        self.agent_objs[2].add_neighbor(self.agent_objs[4])
        self.agent_objs[3].add_neighbor(self.agent_objs[4])

    def reset(self, *, seed=None, options=None):
        if seed is not None:
            np.random.seed(seed)
        
        initial_positions = [0.55, 0.4, -0.05, -0.1, -0.7]
        self.agent_objs = [self.Agent(pos, i) for i, pos in enumerate(initial_positions)]
        self.init_neighbors()
        self.current_iteration = 0
        self.epsilon_violated = True
        self.all_within_epsilon = False
        self.total_trigger_count = 0
        self.time_to_reach_epsilon_changes = 0
        self.time_to_reach_epsilon = None
        
        observations = {agent: self.get_observation(agent) for agent in self.agents}
        infos = {agent: {} for agent in self.agents}  # 返回额外的 per-agent 信息字典
        return observations, infos

    # 统一的观测空间
    def get_observation(self, agent):
        agent_index = self.agent_name_mapping[agent]
        agent_obj = self.agent_objs[agent_index]
        neighbors_positions = [neighbor.position for neighbor in agent_obj.neighbors]
        obs = np.array([agent_obj.position] + neighbors_positions, dtype=np.float32)
        
        # 填充观测到最大观测大小
        if len(obs) < self.max_obs_size:
            padding = np.zeros(self.max_obs_size - len(obs))
            obs = np.concatenate([obs, padding])
        
        # 不进行裁剪
        return obs

    # # 不同的观测空间
    # def get_observation(self, agent):
    #     agent_index = self.agent_name_mapping[agent]
    #     agent_obj = self.agent_objs[agent_index]
    #     neighbors_positions = [neighbor.position for neighbor in agent_obj.neighbors]
    #     obs = np.array([agent_obj.position] + neighbors_positions, dtype=np.float32)
    #     return obs

    def compute_average_position_difference(self):
        total_difference = 0
        count = 0
        for i, agent_i in enumerate(self.agent_objs):
            for j, agent_j in enumerate(self.agent_objs):
                if i < j:
                    total_difference += abs(agent_i.position - agent_j.position)
                    count += 1
        if count > 0:
            return total_difference / count
        else:
            return 0
    
    def step(self, action_dict):
        triggers = np.array([action_dict.get(agent, 0) for agent in self.agents])  # 确保访问安全
        trigger_count = np.sum(triggers)
        self.total_trigger_count += trigger_count

        for i, agent in enumerate(self.agent_objs):
            agent.update_position(self.current_iteration, self.dt, triggers[i])

        self.all_within_epsilon = all(all(abs(agent.position - neighbor.position) < self.epsilon for neighbor in agent.neighbors) for agent in self.agent_objs)

        if self.all_within_epsilon:
            if self.epsilon_violated:
                self.time_to_reach_epsilon = self.current_iteration
                self.epsilon_violated = False
                self.time_to_reach_epsilon_changes += 1
        else:
            self.epsilon_violated = True
            self.time_to_reach_epsilon = None
        
        self.current_iteration += 1
        terminated = self.current_iteration >= self.num_iterations

        # 根据时间步调整奖励逻辑
        early_phase = self.current_iteration <= self.num_iterations * 0.25


        rewards = {}
        if not terminated:
            average_position_difference = self.compute_average_position_difference()
            # for agent in self.agents:
            #     if self.all_within_epsilon:
            #         rewards[agent] = 10 if action_dict.get(agent, 0) == 0 else 0  # 动作为0奖励，1惩罚
            #     else:
            #         rewards[agent] = - 10 * np.abs(average_position_difference)

            for agent in self.agents:
                if early_phase:
                    # 在前 25% 的时间步，奖励更加注重位置一致性，减少对触发的惩罚
                    if self.all_within_epsilon:
                        rewards[agent] = 2 if action_dict.get(agent, 0) == 0 else 1
                    else:
                        rewards[agent] = -1 -10 * np.abs(average_position_difference)
                else:
                    # 在后 75% 的时间步，更加注重减少触发次数
                    if self.all_within_epsilon:
                        rewards[agent] = 3 if action_dict.get(agent, 0) == 0 else -0.5
                    else:
                        rewards[agent] = -1.5 - 10 * np.abs(average_position_difference)
        else:
            if self.time_to_reach_epsilon is not None:
                global_reward = 1250 - self.time_to_reach_epsilon - self.total_trigger_count
            else:
                global_reward = -10000
            for agent in self.agents:
                rewards[agent] = global_reward

        observations = {agent: self.get_observation(agent) for agent in self.agents}
        terminateds = {agent: terminated for agent in self.agents}
        terminateds["__all__"] = terminated
        truncateds = {agent: False for agent in self.agents}  # 无需提前结束
        truncateds["__all__"] = False
        infos = {agent: {} for agent in self.agents}

        return observations, rewards, terminateds, truncateds, infos
    
    def render(self, mode='human'):
        positions = [agent.position for agent in self.agent_objs]
        print(f"Positions: {positions}")
    
    # 统一大小的观测空间
    def observation_space(self, agent):
        return spaces.Box(low=-100, high=100, shape=(self.max_obs_size,), dtype=np.float32)
    
    # def observation_space(self, agent):
    #     num_neighbors = len(self.agent_objs[self.agent_name_mapping[agent]].neighbors)
    #     obs_size = 1 + num_neighbors  # 自身位置 + 邻居数量
    #     return spaces.Box(low=-np.inf, high=np.inf, shape=(obs_size,), dtype=np.float32)
    
    def action_space(self, agent):
        return spaces.Discrete(2)

    class Agent:
        def __init__(self, initial_position, index):
            self.position = initial_position
            self.index = index
            self.neighbors = []
            self.last_broadcast_position = self.position
            self.trigger_points = []
            self.u_i = 0

        def add_neighbor(self, neighbor):
            if neighbor not in self.neighbors:
                self.neighbors.append(neighbor)
                neighbor.neighbors.append(self)

        def update_position(self, t, dt, trigger):
            if trigger == 1 or t == 0:
                self.u_i = -sum((self.last_broadcast_position - neighbor.last_broadcast_position) for neighbor in self.neighbors)
                self.position += self.u_i * dt
                self.last_broadcast_position = self.position
                self.trigger_points.append((t, self.position))
            else:
                self.position += self.u_i * dt

# 环境创建函数
def env_creator(config):
    return MAEnvironment(num_agents=config.get("num_agents", 5))

# 注册环境
register_env("env", lambda config: MAEnvironment(num_agents=config.get("num_agents", 5)))
print("环境注册成功")


# 定义共享策略的映射函数
def shared_policy_mapping_fn(agent_id, *args, **kwargs):
    return "shared_policy"

# 启动 Ray
ray.shutdown() 
ray.init(local_mode=False)




# 配置
config = {
    "env": "env",  # 使用注册的环境名
    "env_config": {
        "num_agents": 5,  # 传递环境的配置参数
    },
    "multiagent": {
        "policies": {
            "shared_policy": (None,  # 使用默认模型
                              env_creator({"num_agents": 5}).observation_space("agent_0"),  # 观测空间
                              env_creator({"num_agents": 5}).action_space("agent_0"),  # 动作空间
                              {}),
        },
        "policy_mapping_fn": shared_policy_mapping_fn,  # 使用共享策略映射
    },
    "framework": "torch",  # 使用 "torch" 或 "tf"
    "num_workers": 4,  # 使用的工作线程数
    "num_envs_per_worker": 5,
    "train_batch_size": 4000, #每次训练时使用的总样本数
    "sgd_minibatch_size": 256,
    "lr": 0.0003,
    "num_sgd_iter": 20,
}

print("开始训练")

class SaveOnMaxRewardCallback(DefaultCallbacks):
    def __init__(self, reward_threshold=5000, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.reward_threshold = reward_threshold  # 奖励阈值
        self.saved_checkpoint = False  # 确保只保存一次模型
   
    def on_train_result(self, *, algorithm, result, **kwargs):
        """在每次训练结束时调用，检查最大奖励并保存模型。"""
        max_reward = result["episode_reward_max"]  # 获取当前最大奖励

        if max_reward >= self.reward_threshold and not self.saved_checkpoint:
            checkpoint_dir = algorithm.save()  # 保存模型
            print(f"模型已保存，路径为：{checkpoint_dir}，最大奖励：{max_reward}")
            self.saved_checkpoint = True  # 确保只保存一次

#更新配置
config.update({
    "callbacks": SaveOnMaxRewardCallback,  # 设置自定义回调
})



# 运行训练并保存模型
analysis = tune.run(
    PPO,
    config=config,
    stop={"training_iteration": 50},
    #local_dir="/Users/cyj/Documents/Project/Python/Multi-agent-consensus-algorithm/MARL/ray/tensorboard_logs",
    checkpoint_at_end=True,
    checkpoint_freq=5
)


# 关闭 Ray
ray.shutdown()

环境注册成功


2025-01-03 17:09:53,122	INFO worker.py:1752 -- Started a local Ray instance.
2025-01-03 17:09:53,579	INFO tune.py:613 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


开始训练


0,1
Current time:,2025-01-03 17:10:00
Running for:,00:00:07.35
Memory:,11.0/16.0 GiB

Trial name,# failures,error file
PPO_env_7e859_00000,1,/tmp/ray/session_2025-01-03_17-09-51_600290_14960/artifacts/2025-01-03_17-09-53/PPO_2025-01-03_17-09-53/driver_artifacts/PPO_env_7e859_00000_0_2025-01-03_17-09-53/error.txt

Trial name,status,loc
PPO_env_7e859_00000,ERROR,


[36m(RolloutWorker pid=16101)[0m Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::RolloutWorker.__init__()[39m (pid=16101, ip=127.0.0.1, actor_id=1d77eae3269d50967b942d4901000000, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x3803a3760>)
[36m(RolloutWorker pid=16101)[0m   File "/Users/cyj/anaconda3/envs/py38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 480, in __init__
[36m(RolloutWorker pid=16101)[0m     self.policy_dict, self.is_policy_to_train = self.config.get_multi_agent_setup(
[36m(RolloutWorker pid=16101)[0m   File "/Users/cyj/anaconda3/envs/py38/lib/python3.8/site-packages/ray/rllib/algorithms/algorithm_config.py", line 3129, in get_multi_agent_setup
[36m(RolloutWorker pid=16101)[0m     ].observation_space = convert_old_gym_space_to_gymnasium_space(
[36m(RolloutWorker pid=16101)[0m   File "/Users/cyj/anaconda3/envs/py38/lib/python3.8/site-packages/ray/rlli

Trial name
PPO_env_7e859_00000


2025-01-03 17:10:00,929	INFO tune.py:1016 -- Wrote the latest version of all result files and experiment state to '/Users/cyj/ray_results/PPO_2025-01-03_17-09-53' in 0.0020s.


TuneError: ('Trials did not complete', [PPO_env_7e859_00000])

[36m(PPO pid=16093)[0m Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::PPO.__init__()[39m (pid=16093, ip=127.0.0.1, actor_id=0caff997712070444c39035101000000, repr=PPO)
[36m(PPO pid=16093)[0m   File "/Users/cyj/anaconda3/envs/py38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 229, in _setup
[36m(PPO pid=16093)[0m     self.add_workers(
[36m(PPO pid=16093)[0m   File "/Users/cyj/anaconda3/envs/py38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 682, in add_workers
[36m(PPO pid=16093)[0m     raise result.get()
[36m(PPO pid=16093)[0m   File "/Users/cyj/anaconda3/envs/py38/lib/python3.8/site-packages/ray/rllib/utils/actor_manager.py", line 497, in _fetch_result
[36m(PPO pid=16093)[0m     result = ray.get(r)
[36m(PPO pid=16093)[0m ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, [36mray::RolloutWorker.__init__()[39m (pid=16