Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] New ConnectorV2 API #02: SingleAgentEpisode enhancements. #41075

Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9ac9a41
wip
sven1977 Nov 10, 2023
fc6670d
Merge branch 'master' into env_runner_support_connectors_02_sa_episod…
sven1977 Nov 17, 2023
8d85c54
wip
sven1977 Nov 17, 2023
b77350d
Merge remote-tracking branch 'origin/env_runner_support_connectors_02…
sven1977 Nov 17, 2023
7a2e6a7
LINT
sven1977 Nov 17, 2023
89d5847
LINT
sven1977 Nov 17, 2023
21c084b
wip
sven1977 Nov 17, 2023
6fa977f
wip
sven1977 Nov 17, 2023
a31fb51
wip
sven1977 Nov 20, 2023
5b53cf7
wip
sven1977 Nov 20, 2023
f90bf46
wip
sven1977 Nov 20, 2023
46699cf
wip
sven1977 Nov 20, 2023
75d2968
wip
sven1977 Nov 20, 2023
f397563
LINT
sven1977 Nov 20, 2023
93f9d28
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 20, 2023
bb538d0
wip
sven1977 Nov 20, 2023
9341f8f
wip
sven1977 Nov 20, 2023
7935b8f
wip
sven1977 Nov 21, 2023
900bcfc
wip
sven1977 Nov 21, 2023
397c2a6
wip
sven1977 Nov 22, 2023
46779d9
wip
sven1977 Nov 22, 2023
6d609da
wip
sven1977 Nov 22, 2023
5a68444
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 23, 2023
12ab11c
wip
sven1977 Nov 23, 2023
b3f222c
wip
sven1977 Nov 24, 2023
a91d399
wip
sven1977 Nov 25, 2023
7c3df95
wip
sven1977 Nov 25, 2023
02aeece
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 25, 2023
432aff3
wip
sven1977 Nov 27, 2023
4575e00
wip
sven1977 Nov 27, 2023
84a9495
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 27, 2023
dcf38a7
wip
sven1977 Nov 27, 2023
4cd7d81
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 28, 2023
2645c87
wip
sven1977 Nov 28, 2023
1c9b331
wip
sven1977 Nov 28, 2023
a74eaec
LINT
sven1977 Nov 28, 2023
486569d
wip
sven1977 Nov 28, 2023
05752c9
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 29, 2023
64be6fd
wip
sven1977 Nov 29, 2023
648f30f
wip
sven1977 Nov 29, 2023
9b2d5ad
wip
sven1977 Nov 29, 2023
22e5181
wip
sven1977 Nov 29, 2023
c3d8ea5
wip
sven1977 Nov 30, 2023
07ec6b1
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 30, 2023
a8e9bff
learning CartPole w/ EnvRunner
sven1977 Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,16 @@ py_test(
args = ["--dir=tuned_examples/ppo"]
)

py_test(
name = "test_memory_leak_ppo_new_stack",
tags = ["team:rllib", "memory_leak_tests"],
main = "utils/tests/run_memory_leak_tests.py",
size = "large",
srcs = ["utils/tests/run_memory_leak_tests.py"],
data = ["tuned_examples/ppo/memory-leak-test-ppo-new-stack.py"],
args = ["--dir=tuned_examples/ppo", "--to-check=rollout_worker"]
)

py_test(
name = "test_memory_leak_sac",
tags = ["team:rllib", "memory_leak_tests"],
Expand Down Expand Up @@ -818,19 +828,26 @@ sh_test(
# )

py_test(
name = "env/tests/test_single_agent_gym_env_runner",
name = "env/tests/test_single_agent_env_runner",
tags = ["team:rllib", "env"],
size = "medium",
srcs = ["env/tests/test_single_agent_gym_env_runner.py"]
srcs = ["env/tests/test_single_agent_env_runner.py"]
)

py_test(
name = "env/tests/test_single_agent_episode",
tags = ["team:rllib", "env"],
size = "medium",
size = "small",
srcs = ["env/tests/test_single_agent_episode.py"]
)

py_test(
name = "env/tests/test_lookback_buffer",
tags = ["team:rllib", "env"],
size = "small",
srcs = ["env/tests/test_lookback_buffer.py"]
)

py_test(
name = "env/wrappers/tests/test_exception_wrapper",
tags = ["team:rllib", "env"],
Expand Down Expand Up @@ -1332,7 +1349,6 @@ py_test(
# Tag: utils
# --------------------------------------------------------------------

# Checkpoint Utils
py_test(
name = "test_checkpoint_utils",
tags = ["team:rllib", "utils"],
Expand Down
68 changes: 37 additions & 31 deletions rllib/algorithms/dreamerv3/utils/env_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
_, tf, _ = try_import_tf()


# TODO (sven): Use SingleAgentEnvRunner instead of this as soon as we have the new
# ConnectorV2 example classes to make Atari work properly with these (w/o requiring the
# classes at the bottom of this file here, e.g. `ActionClip`).
class DreamerV3EnvRunner(EnvRunner):
"""An environment runner to collect data from vectorized gymnasium environments."""

Expand Down Expand Up @@ -144,6 +147,7 @@ def __init__(

self._needs_initial_reset = True
self._episodes = [None for _ in range(self.num_envs)]
self._states = [None for _ in range(self.num_envs)]

# TODO (sven): Move metrics temp storage and collection out of EnvRunner
# and RolloutWorkers. These classes should not continue tracking some data
Expand Down Expand Up @@ -254,10 +258,8 @@ def _sample_timesteps(

# Set initial obs and states in the episodes.
for i in range(self.num_envs):
self._episodes[i].add_initial_observation(
initial_observation=obs[i],
initial_state={k: s[i] for k, s in states.items()},
)
self._episodes[i].add_env_reset(observation=obs[i])
self._states[i] = {k: s[i] for k, s in states.items()}
# Don't reset existing envs; continue in already started episodes.
else:
# Pick up stored observations and states from previous timesteps.
Expand All @@ -268,7 +270,9 @@ def _sample_timesteps(
states = {
k: np.stack(
[
initial_states[k][i] if eps.states is None else eps.states[k]
initial_states[k][i]
if self._states[i] is None
else self._states[i][k]
for i, eps in enumerate(self._episodes)
]
)
Expand All @@ -278,7 +282,7 @@ def _sample_timesteps(
# to 1.0, otherwise 0.0.
is_first = np.zeros((self.num_envs,))
for i, eps in enumerate(self._episodes):
if eps.states is None:
if len(eps) == 0:
is_first[i] = 1.0

# Loop through env for n timesteps.
Expand Down Expand Up @@ -319,37 +323,39 @@ def _sample_timesteps(
if terminateds[i] or truncateds[i]:
# Finish the episode with the actual terminal observation stored in
# the info dict.
self._episodes[i].add_timestep(
infos["final_observation"][i],
actions[i],
rewards[i],
state=s,
self._episodes[i].add_env_step(
observation=infos["final_observation"][i],
action=actions[i],
reward=rewards[i],
is_terminated=terminateds[i],
is_truncated=truncateds[i],
)
self._states[i] = s
# Reset h-states to the model's initial ones b/c we are starting a
# new episode.
for k, v in self.module.get_initial_state().items():
states[k][i] = v.numpy()
is_first[i] = True
done_episodes_to_return.append(self._episodes[i])
# Create a new episode object.
self._episodes[i] = SingleAgentEpisode(
observations=[obs[i]], states=s
)
self._episodes[i] = SingleAgentEpisode(observations=[obs[i]])
else:
self._episodes[i].add_timestep(
obs[i], actions[i], rewards[i], state=s
self._episodes[i].add_env_step(
observation=obs[i],
action=actions[i],
reward=rewards[i],
)
is_first[i] = False

self._states[i] = s

# Return done episodes ...
self._done_episodes_for_metrics.extend(done_episodes_to_return)
# ... and all ongoing episode chunks. Also, make sure, we return
# a copy and start new chunks so that callers of this function
# don't alter our ongoing and returned Episode objects.
ongoing_episodes = self._episodes
self._episodes = [eps.create_successor() for eps in self._episodes]
self._episodes = [eps.cut() for eps in self._episodes]
for eps in ongoing_episodes:
self._ongoing_episodes_for_metrics[eps.id_].append(eps)

Expand Down Expand Up @@ -385,11 +391,11 @@ def _sample_episodes(
render_images = [e.render() for e in self.env.envs]

for i in range(self.num_envs):
episodes[i].add_initial_observation(
initial_observation=obs[i],
initial_state={k: s[i] for k, s in states.items()},
initial_render_image=render_images[i],
episodes[i].add_env_reset(
observation=obs[i],
render_image=render_images[i],
)
self._states[i] = {k: s[i] for k, s in states.items()}

eps = 0
while eps < num_episodes:
Expand Down Expand Up @@ -425,14 +431,14 @@ def _sample_episodes(
if terminateds[i] or truncateds[i]:
eps += 1

episodes[i].add_timestep(
infos["final_observation"][i],
actions[i],
rewards[i],
state=s,
episodes[i].add_env_step(
observation=infos["final_observation"][i],
action=actions[i],
reward=rewards[i],
is_terminated=terminateds[i],
is_truncated=truncateds[i],
)
self._states[i] = s
done_episodes_to_return.append(episodes[i])

# Also early-out if we reach the number of episodes within this
Expand All @@ -452,13 +458,13 @@ def _sample_episodes(
render_images=[render_images[i]],
)
else:
episodes[i].add_timestep(
obs[i],
actions[i],
rewards[i],
state=s,
episodes[i].add_env_step(
observation=obs[i],
action=actions[i],
reward=rewards[i],
render_image=render_images[i],
)
self._states[i] = s
is_first[i] = False

self._done_episodes_for_metrics.extend(done_episodes_to_return)
Expand Down
Loading
Loading