From b7abfe91a69bf124fdc454d4025e40110153be00 Mon Sep 17 00:00:00 2001 From: luyudong Date: Sat, 7 Oct 2023 12:12:40 +0800 Subject: [PATCH 1/4] Add collector logging in new pipeline --- .../middleware/functional/collector.py | 111 ++++++++++++++++-- 1 file changed, 100 insertions(+), 11 deletions(-) diff --git a/ding/framework/middleware/functional/collector.py b/ding/framework/middleware/functional/collector.py index 62d183e6d8..f5be1f8127 100644 --- a/ding/framework/middleware/functional/collector.py +++ b/ding/framework/middleware/functional/collector.py @@ -1,6 +1,8 @@ -from typing import TYPE_CHECKING, Callable, List, Tuple, Any +from typing import TYPE_CHECKING, Callable, List, Tuple, Any, Optional from functools import reduce import treetensor.torch as ttorch +import numpy as np +from ding.utils import EasyTimer, build_logger from ding.envs import BaseEnvManager from ding.policy import Policy from ding.torch_utils import to_ndarray, get_shape0 @@ -83,7 +85,7 @@ def _inference(ctx: "OnlineRLContext"): return _inference -def rolloutor(policy: Policy, env: BaseEnvManager, transitions: TransitionList) -> Callable: +def rolloutor(policy: Policy, env: BaseEnvManager, transitions: TransitionList, collect_print_freq=100, tb_logger: 'SummaryWriter' = None, exp_name: Optional[str] = 'default_experiment', instance_name: Optional[str] = 'collector') -> Callable: """ Overview: The middleware that executes the transition process in the env. @@ -98,6 +100,64 @@ def rolloutor(policy: Policy, env: BaseEnvManager, transitions: TransitionList) env_episode_id = [_ for _ in range(env.env_num)] current_id = env.env_num + timer = EasyTimer() + last_train_iter = 0 + total_envstep_count = 0 + total_episode_count = 0 + total_duration = 0 + total_train_sample_count = 0 + env_info = {env_id: {'time': 0., 'step': 0, 'train_sample': 0} for env_id in range(env.env_num)} + episode_info = [] + + if tb_logger is not None: + logger, _ = build_logger( + path='./{}/log/{}'.format(exp_name, instance_name), + name=instance_name, + need_tb=False + ) + tb_logger = tb_logger + else: + logger, tb_logger = build_logger( + path='./{}/log/{}'.format(exp_name, instance_name), name=instance_name + ) + + def output_log(train_iter: int) -> None: + """ + Overview: + Print the output log information. You can refer to the docs of `Best Practice` to understand \ + the training generated logs and tensorboards. + Arguments: + - train_iter (:obj:`int`): the number of training iteration. + """ + nonlocal episode_info, timer, total_episode_count, total_duration, total_envstep_count, total_train_sample_count, last_train_iter + if (train_iter - last_train_iter) >= collect_print_freq and len(episode_info) > 0: + last_train_iter = train_iter + episode_count = len(episode_info) + envstep_count = sum([d['step'] for d in episode_info]) + train_sample_count = sum([d['train_sample'] for d in episode_info]) + duration = sum([d['time'] for d in episode_info]) + episode_return = [d['reward'].item() for d in episode_info] + print(episode_return) + info = { + 'episode_count': episode_count, + 'envstep_count': envstep_count, + 'train_sample_count': train_sample_count, + 'avg_envstep_per_episode': envstep_count / episode_count, + 'avg_sample_per_episode': train_sample_count / episode_count, + 'avg_envstep_per_sec': envstep_count / duration, + 'avg_train_sample_per_sec': train_sample_count / duration, + 'avg_episode_per_sec': episode_count / duration, + 'reward_mean': np.mean(episode_return), + 'reward_std': np.std(episode_return), + 'reward_max': np.max(episode_return), + 'reward_min': np.min(episode_return), + 'total_envstep_count': total_envstep_count, + 'total_train_sample_count': total_train_sample_count, + 'total_episode_count': total_episode_count, + # 'each_reward': episode_return, + } + episode_info.clear() + logger.info("collect end:\n{}".format('\n'.join(['{}: {}'.format(k, v) for k, v in info.items()]))) def _rollout(ctx: "OnlineRLContext"): """ @@ -113,22 +173,51 @@ def _rollout(ctx: "OnlineRLContext"): trajectory stops. """ - nonlocal current_id + nonlocal current_id, env_info, episode_info, timer, total_episode_count, total_duration, total_envstep_count, total_train_sample_count, last_train_iter timesteps = env.step(ctx.action) ctx.env_step += len(timesteps) timesteps = [t.tensor() for t in timesteps] # TODO abnormal env step + + collected_sample = 0 + collected_step = 0 + collected_episode = 0 + interaction_duration = timer.value / len(timesteps) for i, timestep in enumerate(timesteps): - transition = policy.process_transition(ctx.obs[i], ctx.inference_output[i], timestep) - transition = ttorch.as_tensor(transition) # TBD - transition.collect_train_iter = ttorch.as_tensor([ctx.train_iter]) - transition.env_data_id = ttorch.as_tensor([env_episode_id[timestep.env_id]]) - transitions.append(timestep.env_id, transition) + with timer: + transition = policy.process_transition(ctx.obs[i], ctx.inference_output[i], timestep) + transition = ttorch.as_tensor(transition) # TBD + transition.collect_train_iter = ttorch.as_tensor([ctx.train_iter]) + transition.env_data_id = ttorch.as_tensor([env_episode_id[timestep.env_id]]) + transitions.append(timestep.env_id, transition) + + collected_step += 1 + collected_sample += len(transition.obs) + env_info[timestep.env_id.item()]['step'] += 1 + env_info[timestep.env_id.item()]['train_sample'] += len(transition.obs) + + env_info[timestep.env_id.item()]['time'] += timer.value + interaction_duration if timestep.done: - policy.reset([timestep.env_id]) - env_episode_id[timestep.env_id] = current_id + info = { + 'reward': timestep.info['eval_episode_return'], + 'time': env_info[timestep.env_id.item()]['time'], + 'step': env_info[timestep.env_id.item()]['step'], + 'train_sample': env_info[timestep.env_id.item()]['train_sample'], + } + + episode_info.append(info) + policy.reset([timestep.env_id.item()]) + env_episode_id[timestep.env_id.item()] = current_id + collected_episode += 1 current_id += 1 ctx.env_episode += 1 - # TODO log + + collected_duration = sum([d['time'] for d in episode_info]) + total_envstep_count += collected_step + total_episode_count += collected_episode + total_duration += collected_duration + total_train_sample_count += collected_sample + + output_log(ctx.train_iter) return _rollout From a0423c73760d28b4ad7bead4d074f03e3d397115 Mon Sep 17 00:00:00 2001 From: luyudong Date: Sat, 7 Oct 2023 12:20:25 +0800 Subject: [PATCH 2/4] Reformat --- .../middleware/functional/collector.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/ding/framework/middleware/functional/collector.py b/ding/framework/middleware/functional/collector.py index f5be1f8127..873df0f055 100644 --- a/ding/framework/middleware/functional/collector.py +++ b/ding/framework/middleware/functional/collector.py @@ -85,7 +85,15 @@ def _inference(ctx: "OnlineRLContext"): return _inference -def rolloutor(policy: Policy, env: BaseEnvManager, transitions: TransitionList, collect_print_freq=100, tb_logger: 'SummaryWriter' = None, exp_name: Optional[str] = 'default_experiment', instance_name: Optional[str] = 'collector') -> Callable: +def rolloutor( + policy: Policy, + env: BaseEnvManager, + transitions: TransitionList, + collect_print_freq=100, + tb_logger: 'SummaryWriter' = None, + exp_name: Optional[str] = 'default_experiment', + instance_name: Optional[str] = 'collector' +) -> Callable: """ Overview: The middleware that executes the transition process in the env. @@ -110,16 +118,10 @@ def rolloutor(policy: Policy, env: BaseEnvManager, transitions: TransitionList, episode_info = [] if tb_logger is not None: - logger, _ = build_logger( - path='./{}/log/{}'.format(exp_name, instance_name), - name=instance_name, - need_tb=False - ) + logger, _ = build_logger(path='./{}/log/{}'.format(exp_name, instance_name), name=instance_name, need_tb=False) tb_logger = tb_logger else: - logger, tb_logger = build_logger( - path='./{}/log/{}'.format(exp_name, instance_name), name=instance_name - ) + logger, tb_logger = build_logger(path='./{}/log/{}'.format(exp_name, instance_name), name=instance_name) def output_log(train_iter: int) -> None: """ @@ -195,7 +197,7 @@ def _rollout(ctx: "OnlineRLContext"): collected_sample += len(transition.obs) env_info[timestep.env_id.item()]['step'] += 1 env_info[timestep.env_id.item()]['train_sample'] += len(transition.obs) - + env_info[timestep.env_id.item()]['time'] += timer.value + interaction_duration if timestep.done: info = { @@ -211,13 +213,13 @@ def _rollout(ctx: "OnlineRLContext"): collected_episode += 1 current_id += 1 ctx.env_episode += 1 - + collected_duration = sum([d['time'] for d in episode_info]) total_envstep_count += collected_step total_episode_count += collected_episode total_duration += collected_duration total_train_sample_count += collected_sample - + output_log(ctx.train_iter) return _rollout From 6fe690b62bb1951004e512dd984b0264ad41a61f Mon Sep 17 00:00:00 2001 From: luyudong Date: Sun, 8 Oct 2023 14:44:38 +0800 Subject: [PATCH 3/4] Reformat --- ding/framework/middleware/functional/collector.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ding/framework/middleware/functional/collector.py b/ding/framework/middleware/functional/collector.py index 873df0f055..e6659e3ea6 100644 --- a/ding/framework/middleware/functional/collector.py +++ b/ding/framework/middleware/functional/collector.py @@ -90,7 +90,7 @@ def rolloutor( env: BaseEnvManager, transitions: TransitionList, collect_print_freq=100, - tb_logger: 'SummaryWriter' = None, + tb_logger=None, exp_name: Optional[str] = 'default_experiment', instance_name: Optional[str] = 'collector' ) -> Callable: @@ -131,7 +131,8 @@ def output_log(train_iter: int) -> None: Arguments: - train_iter (:obj:`int`): the number of training iteration. """ - nonlocal episode_info, timer, total_episode_count, total_duration, total_envstep_count, total_train_sample_count, last_train_iter + nonlocal episode_info, timer, total_episode_count, total_duration, \ + total_envstep_count, total_train_sample_count, last_train_iter if (train_iter - last_train_iter) >= collect_print_freq and len(episode_info) > 0: last_train_iter = train_iter episode_count = len(episode_info) @@ -175,7 +176,8 @@ def _rollout(ctx: "OnlineRLContext"): trajectory stops. """ - nonlocal current_id, env_info, episode_info, timer, total_episode_count, total_duration, total_envstep_count, total_train_sample_count, last_train_iter + nonlocal current_id, env_info, episode_info, timer, \ + total_episode_count, total_duration, total_envstep_count, total_train_sample_count, last_train_iter timesteps = env.step(ctx.action) ctx.env_step += len(timesteps) timesteps = [t.tensor() for t in timesteps] From 0ccab8386d19502389dfb4607527b93b0b7db33c Mon Sep 17 00:00:00 2001 From: luyudong Date: Fri, 13 Oct 2023 14:01:02 +0800 Subject: [PATCH 4/4] Fix according to comment --- .../middleware/functional/collector.py | 100 ++++++++---------- 1 file changed, 43 insertions(+), 57 deletions(-) diff --git a/ding/framework/middleware/functional/collector.py b/ding/framework/middleware/functional/collector.py index e6659e3ea6..d2fb4483b9 100644 --- a/ding/framework/middleware/functional/collector.py +++ b/ding/framework/middleware/functional/collector.py @@ -1,8 +1,9 @@ -from typing import TYPE_CHECKING, Callable, List, Tuple, Any, Optional +from typing import TYPE_CHECKING, Callable, List, Tuple, Any from functools import reduce import treetensor.torch as ttorch import numpy as np -from ding.utils import EasyTimer, build_logger +from ditk import logging +from ding.utils import EasyTimer from ding.envs import BaseEnvManager from ding.policy import Policy from ding.torch_utils import to_ndarray, get_shape0 @@ -90,9 +91,6 @@ def rolloutor( env: BaseEnvManager, transitions: TransitionList, collect_print_freq=100, - tb_logger=None, - exp_name: Optional[str] = 'default_experiment', - instance_name: Optional[str] = 'collector' ) -> Callable: """ Overview: @@ -112,56 +110,10 @@ def rolloutor( last_train_iter = 0 total_envstep_count = 0 total_episode_count = 0 - total_duration = 0 total_train_sample_count = 0 env_info = {env_id: {'time': 0., 'step': 0, 'train_sample': 0} for env_id in range(env.env_num)} episode_info = [] - if tb_logger is not None: - logger, _ = build_logger(path='./{}/log/{}'.format(exp_name, instance_name), name=instance_name, need_tb=False) - tb_logger = tb_logger - else: - logger, tb_logger = build_logger(path='./{}/log/{}'.format(exp_name, instance_name), name=instance_name) - - def output_log(train_iter: int) -> None: - """ - Overview: - Print the output log information. You can refer to the docs of `Best Practice` to understand \ - the training generated logs and tensorboards. - Arguments: - - train_iter (:obj:`int`): the number of training iteration. - """ - nonlocal episode_info, timer, total_episode_count, total_duration, \ - total_envstep_count, total_train_sample_count, last_train_iter - if (train_iter - last_train_iter) >= collect_print_freq and len(episode_info) > 0: - last_train_iter = train_iter - episode_count = len(episode_info) - envstep_count = sum([d['step'] for d in episode_info]) - train_sample_count = sum([d['train_sample'] for d in episode_info]) - duration = sum([d['time'] for d in episode_info]) - episode_return = [d['reward'].item() for d in episode_info] - print(episode_return) - info = { - 'episode_count': episode_count, - 'envstep_count': envstep_count, - 'train_sample_count': train_sample_count, - 'avg_envstep_per_episode': envstep_count / episode_count, - 'avg_sample_per_episode': train_sample_count / episode_count, - 'avg_envstep_per_sec': envstep_count / duration, - 'avg_train_sample_per_sec': train_sample_count / duration, - 'avg_episode_per_sec': episode_count / duration, - 'reward_mean': np.mean(episode_return), - 'reward_std': np.std(episode_return), - 'reward_max': np.max(episode_return), - 'reward_min': np.min(episode_return), - 'total_envstep_count': total_envstep_count, - 'total_train_sample_count': total_train_sample_count, - 'total_episode_count': total_episode_count, - # 'each_reward': episode_return, - } - episode_info.clear() - logger.info("collect end:\n{}".format('\n'.join(['{}: {}'.format(k, v) for k, v in info.items()]))) - def _rollout(ctx: "OnlineRLContext"): """ Input of ctx: @@ -177,11 +129,10 @@ def _rollout(ctx: "OnlineRLContext"): """ nonlocal current_id, env_info, episode_info, timer, \ - total_episode_count, total_duration, total_envstep_count, total_train_sample_count, last_train_iter + total_episode_count, total_envstep_count, total_train_sample_count, last_train_iter timesteps = env.step(ctx.action) ctx.env_step += len(timesteps) timesteps = [t.tensor() for t in timesteps] - # TODO abnormal env step collected_sample = 0 collected_step = 0 @@ -190,7 +141,7 @@ def _rollout(ctx: "OnlineRLContext"): for i, timestep in enumerate(timesteps): with timer: transition = policy.process_transition(ctx.obs[i], ctx.inference_output[i], timestep) - transition = ttorch.as_tensor(transition) # TBD + transition = ttorch.as_tensor(transition) transition.collect_train_iter = ttorch.as_tensor([ctx.train_iter]) transition.env_data_id = ttorch.as_tensor([env_episode_id[timestep.env_id]]) transitions.append(timestep.env_id, transition) @@ -216,12 +167,47 @@ def _rollout(ctx: "OnlineRLContext"): current_id += 1 ctx.env_episode += 1 - collected_duration = sum([d['time'] for d in episode_info]) total_envstep_count += collected_step total_episode_count += collected_episode - total_duration += collected_duration total_train_sample_count += collected_sample - output_log(ctx.train_iter) + if (ctx.train_iter - last_train_iter) >= collect_print_freq and len(episode_info) > 0: + output_log(episode_info, total_episode_count, total_envstep_count, total_train_sample_count) + last_train_iter = ctx.train_iter return _rollout + + +def output_log(episode_info, total_episode_count, total_envstep_count, total_train_sample_count) -> None: + """ + Overview: + Print the output log information. You can refer to the docs of `Best Practice` to understand \ + the training generated logs and tensorboards. + Arguments: + - train_iter (:obj:`int`): the number of training iteration. + """ + episode_count = len(episode_info) + envstep_count = sum([d['step'] for d in episode_info]) + train_sample_count = sum([d['train_sample'] for d in episode_info]) + duration = sum([d['time'] for d in episode_info]) + episode_return = [d['reward'].item() for d in episode_info] + info = { + 'episode_count': episode_count, + 'envstep_count': envstep_count, + 'train_sample_count': train_sample_count, + 'avg_envstep_per_episode': envstep_count / episode_count, + 'avg_sample_per_episode': train_sample_count / episode_count, + 'avg_envstep_per_sec': envstep_count / duration, + 'avg_train_sample_per_sec': train_sample_count / duration, + 'avg_episode_per_sec': episode_count / duration, + 'reward_mean': np.mean(episode_return), + 'reward_std': np.std(episode_return), + 'reward_max': np.max(episode_return), + 'reward_min': np.min(episode_return), + 'total_envstep_count': total_envstep_count, + 'total_train_sample_count': total_train_sample_count, + 'total_episode_count': total_episode_count, + # 'each_reward': episode_return, + } + episode_info.clear() + logging.info("collect end:\n{}".format('\n'.join(['{}: {}'.format(k, v) for k, v in info.items()])))