In [1]:
import ray
import ray.rllib.algorithms.cql as cql
from ray.tune.logger import pretty_print

ray.init(ignore_reinit_error=True)

# 复制默认配置，并指定离线数据路径
config = cql.DEFAULT_CONFIG.copy()
config["num_workers"] = 1
config["framework"] = "torch"
# 指定离线数据输入路径，RLlib 会自动加载该目录下的 JSON Lines 文件
config["output"] = r"D:\Desktop\CQL\tmp\pendulum-out"  # 输出目录
config["output_max_file_size"] = 5000000  # 最大文件大小限制

# 注意：离线训练时，算法不会与环境交互采集数据
algo = cql.CQL(config=config, env="Pendulum-v1")

# 离线训练循环
for i in range(10):
    result = algo.train()
    print(pretty_print(result))

    if i % 10 == 0:
        checkpoint = algo.save("save_model")
        print("checkpoint saved at", checkpoint)

ray.shutdown()

Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
2025-03-18 09:39:48,971	INFO worker.py:1518 -- Started a local Ray instance.
2025-03-18 09:39:50,092	INFO simple_q.py:293 -- In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting `simple_optimizer=True` if this doesn't work for you.
2025-03-18 09:39:50,093	INFO algorithm.py:351 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
[2m[36m(RolloutWorker pid=8812)[0m Implementing implicit namespace packages (as

agent_timesteps_total: 100
counters:
  last_target_update_ts: 100
  num_agent_steps_sampled: 100
  num_agent_steps_trained: 100
  num_env_steps_sampled: 100
  num_env_steps_trained: 100
  num_target_updates: 100
custom_metrics: {}
date: 2025-03-18_09-39-52
done: false
episode_len_mean: .nan
episode_media: {}
episode_reward_max: .nan
episode_reward_mean: .nan
episode_reward_min: .nan
episodes_this_iter: 0
episodes_total: 0
experiment_id: 7e19f2cbbaf44dab864898c6bb49c2d1
hostname: wei
info:
  last_target_update_ts: 100
  learner:
    default_policy:
      custom_metrics: {}
      learner_stats:
        actor_loss: -0.49013596773147583
        alpha_loss: 0.0
        alpha_value:
        - 1.0
        cql_loss: 20.608327865600586
        critic_loss: 62.7724723815918
        log_alpha_value:
        - 0.0
        max_q: 0.0021238019689917564
        mean_q: 0.0021238019689917564
        min_q: 0.0021238019689917564
        policy_t: 0.26833271980285645
        target_entropy:
        - -1

In [12]:
import gym
import numpy as np
from ray.rllib.evaluation.sample_batch_builder import SampleBatchBuilder
from ray.rllib.offline.json_writer import JsonWriter
import json
import os
import base64
import io

# 创建环境
env = gym.make("Pendulum-v1")

# 创建数据采集器
batch_builder = SampleBatchBuilder()
output_dir = "D:\\Desktop\\CQL\\jsonwriter\\pendulum-out"

# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

# 创建输出文件
output_file = os.path.join(output_dir, "data.json")

def numpy_to_base64(arr):
    """将numpy数组转换为base64字符串"""
    if isinstance(arr, np.ndarray):
        buf = io.BytesIO()
        np.save(buf, arr)
        return base64.b64encode(buf.getvalue()).decode('utf-8')
    return arr

# 采集数据
with open(output_file, 'w') as f:
    for episode in range(100):
        obs = env.reset()
        done = False
        while not done:
            action = env.action_space.sample()
            new_obs, reward, done, info = env.step(action)
            
            # 构建数据样本，使用base64编码
            sample = {
                "type": "SampleBatch",
                "obs": numpy_to_base64(obs),
                "actions": action.tolist() if isinstance(action, np.ndarray) else action,
                "rewards": float(reward),
                "dones": bool(done),
                "infos": info,
                "new_obs": numpy_to_base64(new_obs),
                "t": 0,
                "eps_id": episode,
                "agent_index": 0,
                "weights": 1.0,
                "action_prob": 1.0,  # 添加这个字段
                "action_logp": 0.0,  # 添加这个字段
                "prev_actions": numpy_to_base64(np.zeros_like(action)),  # 添加这个字段
                "prev_rewards": 0.0  # 添加这个字段
            }
            
            # 直接写入紧凑格式的JSON
            f.write(json.dumps(sample, separators=(',', ':')) + '\n')
            
            obs = new_obs