Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions torchrl/collectors/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import _pickle
import abc
import inspect
import os
Expand Down Expand Up @@ -708,7 +709,14 @@ def shutdown(self) -> None:
del self.env

def __del__(self):
self.shutdown() # make sure env is closed
try:
self.shutdown()
except Exception:
# an AttributeError will typically be raised if the collector is deleted when the program ends.
# In the future, insignificant changes to the close method may change the error type.
# We excplicitely assume that any error raised during closure in
# __del__ will not affect the program.
pass

def state_dict(self) -> OrderedDict:
"""Returns the local state_dict of the data collector (environment and policy).
Expand Down Expand Up @@ -1016,7 +1024,19 @@ def _run_processes(self) -> None:
}
proc = mp.Process(target=_main_async_collector, kwargs=kwargs)
# proc.daemon can't be set as daemonic processes may be launched by the process itself
proc.start()
try:
proc.start()
except _pickle.PicklingError as err:
if "<lambda>" in str(err):
raise RuntimeError(
"""Can't open a process with doubly cloud-pickled lambda function.
This error is likely due to an attempt to use a ParallelEnv in a
multiprocessed data collector. To do this, consider wrapping your
lambda function in an `torchrl.envs.EnvCreator` wrapper as follows:
`env = ParallelEnv(N, EnvCreator(my_lambda_function))`.
This will not only ensure that your lambda function is cloud-pickled once, but
also that the state dict is synchronised across processes if needed."""
)
pipe_child.close()
self.procs.append(proc)
self.pipes.append(pipe_parent)
Expand All @@ -1027,7 +1047,14 @@ def _run_processes(self) -> None:
self.closed = False

def __del__(self):
self.shutdown()
try:
self.shutdown()
except Exception:
# an AttributeError will typically be raised if the collector is deleted when the program ends.
# In the future, insignificant changes to the close method may change the error type.
# We excplicitely assume that any error raised during closure in
# __del__ will not affect the program.
pass

def shutdown(self) -> None:
"""Shuts down all processes. This operation is irreversible."""
Expand Down Expand Up @@ -1624,8 +1651,8 @@ def _main_async_collector(
f"without receiving a command from main. Consider increasing the maximum idle count "
f"if this is expected via the environment variable MAX_IDLE_COUNT "
f"(current value is {_MAX_IDLE_COUNT})."
f"\nIf this occurs at the end of a function, it means that your collector has not been "
f"collected, consider calling `collector.shutdown()` or `del collector` at the end of the function."
f"\nIf this occurs at the end of a function or program, it means that your collector has not been "
f"collected, consider calling `collector.shutdown()` or `del collector` before ending the program."
)
continue
if msg in ("continue", "continue_random"):
Expand Down
9 changes: 8 additions & 1 deletion torchrl/envs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,14 @@ def __del__(self):
# if del occurs before env has been set up, we don't want a recursion
# error
if "is_closed" in self.__dict__ and not self.is_closed:
self.close()
try:
self.close()
except Exception:
# a TypeError will typically be raised if the env is deleted when the program ends.
# In the future, insignificant changes to the close method may change the error type.
# We excplicitely assume that any error raised during closure in
# __del__ will not affect the program.
pass

def to(self, device: DEVICE_TYPING) -> EnvBase:
device = torch.device(device)
Expand Down