In [1]:
import os
os.environ["RAY_DEDUP_LOGS"] = "0"
import numpy as np
import gymnasium as gym
import ale_py
gym.register_envs(ale_py)
import ray

print("ray 版本:", ray.__version__)

ray 版本: 2.40.0


In [2]:
# PPO的训练流程(简化版)
def training_step(self):
    # 1. 收集完整的train_batch
    train_batch = synchronous_parallel_sample(
        max_agent_steps=self.config.total_train_batch_size
    )
    
    # 2. 对整个batch进行多个epoch的训练
    for epoch in range(self.config.num_epochs):
        # 3. 将batch分成多个minibatch
        for minibatch in minibatches:
            # 4. 计算梯度
            gradients = compute_gradients(minibatch)
            # 5. 应用梯度
            self.apply_gradients(gradients)

In [3]:
from ray.rllib.algorithms.ppo.ppo import PPO 

In [4]:
import copy, pickle
from ray.rllib.core import COMPONENT_RL_MODULE
from ray.rllib.algorithms.ppo.ppo import PPOConfig
from ray.rllib.algorithms.ppo.torch.ppo_torch_learner import PPOTorchLearner
from ray.rllib.core.learner.learner_group import LearnerGroup
from ray.rllib.utils.deprecation import (
    Deprecated,
    DEPRECATED_VALUE,
    deprecation_warning,
)
from typing import Dict, Any
import sys
sys.path.append("..")
from easy_helper import simplify_rllib_metrics

class DebugLearner(PPOTorchLearner):

    def set_learner_id(self, _id):
        self._id = _id

    def after_gradient_based_update(
        self,
        *,
        timesteps: Dict[str, Any],
    ) -> None:
        if hasattr(self, "_after_gradient_based_update_count"):
            self._after_gradient_based_update_count += 1
        else:
            self._after_gradient_based_update_count = 0
        print("after_gradient_based_update", self._after_gradient_based_update_count)
        return super().after_gradient_based_update(timesteps=timesteps)

    def compute_gradients(self, *args, **kwargs):
        gradients_dict = super().compute_gradients(*args, **kwargs)

        if hasattr(self, "_compute_gradients_count"):
            self._compute_gradients_count += 1
        else:
            if not hasattr(self, "_id"):
                self._id = 0
            self._compute_gradients_count = 0

        if self._compute_gradients_count == 0:
            pickle.dump(gradients_dict, open(f"gradients_dict_{self._id}.pkl", "wb"))
        print(f"[{self._id}] compute_gradients", self._compute_gradients_count, gradients_dict!={})

        return gradients_dict


    def _update(self, *args, **kwargs):
        # if hasattr(self, "_update_count"):
        #     self._update_count += 1
        # else:
        #     self._update_count = 0
        # print("_update", self._update_count)
        return super()._update(*args, **kwargs)

config = (
    PPOConfig()
    .api_stack(
        enable_rl_module_and_learner=True,
        enable_env_runner_and_connector_v2=True,
    )
    .environment("CartPole-v1")
    .env_runners(num_env_runners=0)
    # .learners(
    #     num_learners=2,
    #     # num_gpus_per_learner=1,
    #     num_cpus_per_learner=0.3,
    # )
    .training(
        learner_class=DebugLearner,# 调试学习者
        train_batch_size_per_learner=4000,  # 固定每次收集的数据量
        minibatch_size=256,     # 固定每个minibatch的大小
        num_epochs=3            # 固定每批数据训练的轮数
    )
)

"""
for i in range(iteration):
    for n in range(num_epochs):
        for j in range(minibatch):
            _update
            apply_gradients

    after_gradient_based_update
"""
algo = config.build()
remote_actor_ids = algo.learner_group._worker_manager.actor_ids()[1:]
res = algo.learner_group.foreach_learner(lambda learner: learner.set_learner_id(1), remote_actor_ids = remote_actor_ids)

mean_return = []
iteration = 3
for i in range(iteration):
    print(f"\nTraining iteration {i+1}/{iteration}")
    result = algo.train()
    important_metrics = simplify_rllib_metrics(result)
    mean_return.append(important_metrics["环境运行器"]["episode平均回报"])

import matplotlib.pyplot as plt
plt.plot(mean_return)
plt.show()

`UnifiedLogger` will be removed in Ray 2.7.
  return UnifiedLogger(config, logdir, loggers=None)
The `JsonLogger interface is deprecated in favor of the `ray.tune.json.JsonLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `CSVLogger interface is deprecated in favor of the `ray.tune.csv.CSVLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `TBXLogger interface is deprecated in favor of the `ray.tune.tensorboardx.TBXLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
2025-01-11 19:42:15,461	INFO trainable.py:161 -- Trainable.setup took 333.870 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


AttributeError: 'NoneType' object has no attribute 'actor_ids'

In [5]:
# 打印关键配置参数
print(f"train_batch_size_per_learner: {algo.config['train_batch_size_per_learner']}")  
print(f"total_train_batch_size: {algo.config['total_train_batch_size']}")
print(f"num_learners: {algo.config['num_learners']}")
print(f"minibatch_size: {algo.config['minibatch_size']}")
print(f"num_epochs: {algo.config['num_epochs']}")

train_batch_size_per_learner: 4000
total_train_batch_size: 8000
num_learners: 2
minibatch_size: 256
num_epochs: 3


In [6]:
# 一个学习者
# 实际 148 次
3 * 3 * (4000 / 256)

140.625

In [7]:
# 两个学习者
# 实际 136*2 = 272 次
3 * 3 * (4000 / 256) * 2


281.25

In [5]:
# 对比两个学习者的 第一个梯度 数据
import pickle
gradients_dict_0 = pickle.load(open("gradients_dict_0.pkl", "rb"))
gradients_dict_1 = pickle.load(open("gradients_dict_1.pkl", "rb"))
gradients_dict_1

{Parameter containing:
 tensor([[ 0.3320, -0.0146,  0.3627, -0.1575],
         [-0.3050,  0.0369, -0.1579,  0.1162],
         [ 0.2566,  0.4253,  0.2776,  0.1951],
         ...,
         [ 0.3585, -0.0988,  0.1650, -0.3770],
         [-0.3650,  0.0427,  0.2870,  0.2597],
         [ 0.3104,  0.1398,  0.3945, -0.4483]], requires_grad=True): tensor([[-9.7154e-05, -7.2634e-04,  1.4180e-04,  1.2316e-03],
         [-9.0691e-05, -7.3899e-04,  1.2667e-04,  1.2149e-03],
         [ 7.7691e-05,  5.6295e-04, -1.0584e-04, -9.3534e-04],
         ...,
         [-1.3965e-04, -1.0518e-03,  1.9679e-04,  1.7514e-03],
         [ 9.9927e-05,  7.9875e-04, -1.3969e-04, -1.3140e-03],
         [-1.3854e-05, -6.5653e-05,  2.8614e-05,  1.5400e-04]]),
 Parameter containing:
 tensor([ 1.2129e-01, -4.3348e-01, -3.8890e-01,  2.5107e-01, -2.3253e-01,
         -8.1101e-02,  4.0101e-01, -3.4625e-02, -1.2649e-01,  1.8479e-01,
         -3.9184e-01,  9.2401e-02,  4.3433e-01, -8.2526e-02,  1.8612e-01,
          9.0426e-04,

In [11]:
gradients_dict_0

{Parameter containing:
 tensor([[ 0.3320, -0.0146,  0.3627, -0.1575],
         [-0.3050,  0.0369, -0.1579,  0.1162],
         [ 0.2566,  0.4253,  0.2776,  0.1951],
         ...,
         [ 0.3585, -0.0988,  0.1650, -0.3770],
         [-0.3650,  0.0427,  0.2870,  0.2597],
         [ 0.3104,  0.1398,  0.3945, -0.4483]], requires_grad=True): tensor([[-9.7154e-05, -7.2634e-04,  1.4180e-04,  1.2316e-03],
         [-9.0691e-05, -7.3899e-04,  1.2667e-04,  1.2149e-03],
         [ 7.7691e-05,  5.6295e-04, -1.0584e-04, -9.3534e-04],
         ...,
         [-1.3965e-04, -1.0518e-03,  1.9679e-04,  1.7514e-03],
         [ 9.9927e-05,  7.9875e-04, -1.3969e-04, -1.3140e-03],
         [-1.3854e-05, -6.5653e-05,  2.8614e-05,  1.5400e-04]]),
 Parameter containing:
 tensor([ 1.2129e-01, -4.3348e-01, -3.8890e-01,  2.5107e-01, -2.3253e-01,
         -8.1101e-02,  4.0101e-01, -3.4625e-02, -1.2649e-01,  1.8479e-01,
         -3.9184e-01,  9.2401e-02,  4.3433e-01, -8.2526e-02,  1.8612e-01,
          9.0426e-04,

In [7]:
import torch
# 对比两个梯度字典中的每个参数的梯度是否完全相同
all_equal = True
for g0, g1 in zip(gradients_dict_0.values(), gradients_dict_1.values()):
    if not torch.allclose(g0, g1):
        all_equal = False
        break
        
print(f"两个梯度是否完全相同: {all_equal}")

两个梯度是否完全相同: True
