Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 45 additions & 29 deletions test/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
RandomPolicy,
)
from torchrl.modules import Actor, OrnsteinUhlenbeckProcessModule, SafeModule
from torchrl.weight_update import SharedMemWeightSyncScheme
from torchrl.weight_update import (
MultiProcessWeightSyncScheme,
SharedMemWeightSyncScheme,
)

if os.getenv("PYTORCH_TEST_FBCODE"):
IS_FB = True
Expand Down Expand Up @@ -1485,12 +1488,12 @@ def env_fn(seed):

@pytest.mark.parametrize("use_async", [False, True])
@pytest.mark.parametrize("cudagraph", [False, True])
@pytest.mark.parametrize(
"weight_sync_scheme",
[None, MultiProcessWeightSyncScheme, SharedMemWeightSyncScheme],
)
@pytest.mark.skipif(not torch.cuda.is_available(), reason="no cuda device found")
def test_update_weights(self, use_async, cudagraph):
from torchrl.weight_update.weight_sync_schemes import (
MultiProcessWeightSyncScheme,
)

def test_update_weights(self, use_async, cudagraph, weight_sync_scheme):
def create_env():
return ContinuousActionVecMockEnv()

Expand All @@ -1503,6 +1506,9 @@ def create_env():
collector_class = (
MultiSyncDataCollector if not use_async else MultiaSyncDataCollector
)
kwargs = {}
if weight_sync_scheme is not None:
kwargs["weight_sync_schemes"] = {"policy": weight_sync_scheme()}
collector = collector_class(
[create_env] * 3,
policy=policy,
Expand All @@ -1511,7 +1517,7 @@ def create_env():
frames_per_batch=20,
cat_results="stack",
cudagraph_policy=cudagraph,
weight_sync_schemes={"policy": MultiProcessWeightSyncScheme()},
**kwargs,
)
assert "policy" in collector._weight_senders, collector._weight_senders.keys()
try:
Expand Down Expand Up @@ -2857,23 +2863,28 @@ def forward(self, td):
# ["cuda:0", "cuda"],
],
)
def test_param_sync(self, give_weights, collector, policy_device, env_device):
from torchrl.weight_update.weight_sync_schemes import (
MultiProcessWeightSyncScheme,
)

@pytest.mark.parametrize(
"weight_sync_scheme",
[None, MultiProcessWeightSyncScheme, SharedMemWeightSyncScheme],
)
def test_param_sync(
self, give_weights, collector, policy_device, env_device, weight_sync_scheme
):
policy = TestUpdateParams.Policy().to(policy_device)

env = EnvCreator(lambda: TestUpdateParams.DummyEnv(device=env_device))
device = env().device
env = [env]
kwargs = {}
if weight_sync_scheme is not None:
kwargs["weight_sync_schemes"] = {"policy": weight_sync_scheme()}
col = collector(
env,
policy,
device=device,
total_frames=200,
frames_per_batch=10,
weight_sync_schemes={"policy": MultiProcessWeightSyncScheme()},
**kwargs,
)
try:
for i, data in enumerate(col):
Expand Down Expand Up @@ -2918,13 +2929,13 @@ def test_param_sync(self, give_weights, collector, policy_device, env_device):
# ["cuda:0", "cuda"],
],
)
@pytest.mark.parametrize(
"weight_sync_scheme",
[None, MultiProcessWeightSyncScheme, SharedMemWeightSyncScheme],
)
def test_param_sync_mixed_device(
self, give_weights, collector, policy_device, env_device
self, give_weights, collector, policy_device, env_device, weight_sync_scheme
):
from torchrl.weight_update.weight_sync_schemes import (
MultiProcessWeightSyncScheme,
)

with torch.device("cpu"):
policy = TestUpdateParams.Policy()
policy.param = nn.Parameter(policy.param.data.to(policy_device))
Expand All @@ -2933,13 +2944,16 @@ def test_param_sync_mixed_device(
env = EnvCreator(lambda: TestUpdateParams.DummyEnv(device=env_device))
device = env().device
env = [env]
kwargs = {}
if weight_sync_scheme is not None:
kwargs["weight_sync_schemes"] = {"policy": weight_sync_scheme()}
col = collector(
env,
policy,
device=device,
total_frames=200,
frames_per_batch=10,
weight_sync_schemes={"policy": MultiProcessWeightSyncScheme()},
**kwargs,
)
try:
for i, data in enumerate(col):
Expand Down Expand Up @@ -3851,7 +3865,7 @@ def test_weight_update(self, weight_updater):
if weight_updater == "scheme_shared":
kwargs = {"weight_sync_schemes": {"policy": SharedMemWeightSyncScheme()}}
elif weight_updater == "scheme_pipe":
kwargs = {"weight_sync_schemes": {"policy": SharedMemWeightSyncScheme()}}
kwargs = {"weight_sync_schemes": {"policy": MultiProcessWeightSyncScheme()}}
elif weight_updater == "weight_updater":
kwargs = {"weight_updater": self.MPSWeightUpdaterBase(policy_weights, 2)}
else:
Expand All @@ -3870,14 +3884,16 @@ def test_weight_update(self, weight_updater):
**kwargs,
)

collector.update_policy_weights_()
# When using policy_factory, must pass weights explicitly
collector.update_policy_weights_(policy_weights)
try:
for i, data in enumerate(collector):
if i == 2:
assert (data["action"] != 0).any()
# zero the policy
policy_weights.data.zero_()
collector.update_policy_weights_()
# When using policy_factory, must pass weights explicitly
collector.update_policy_weights_(policy_weights)
elif i == 3:
assert (data["action"] == 0).all(), data["action"]
break
Expand Down Expand Up @@ -3973,11 +3989,11 @@ def test_start_multi(self, total_frames, cls):
@pytest.mark.parametrize(
"cls", [SyncDataCollector, MultiaSyncDataCollector, MultiSyncDataCollector]
)
def test_start_update_policy(self, total_frames, cls):
from torchrl.weight_update.weight_sync_schemes import (
MultiProcessWeightSyncScheme,
)

@pytest.mark.parametrize(
"weight_sync_scheme",
[None, MultiProcessWeightSyncScheme, SharedMemWeightSyncScheme],
)
def test_start_update_policy(self, total_frames, cls, weight_sync_scheme):
rb = ReplayBuffer(storage=LazyMemmapStorage(max_size=1000))
env = CountingEnv()
m = nn.Linear(env.observation_spec["observation"].shape[-1], 1)
Expand All @@ -3998,8 +4014,8 @@ def test_start_update_policy(self, total_frames, cls):

# Add weight sync schemes for multi-process collectors
kwargs = {}
if cls != SyncDataCollector:
kwargs["weight_sync_schemes"] = {"policy": MultiProcessWeightSyncScheme()}
if cls != SyncDataCollector and weight_sync_scheme is not None:
kwargs["weight_sync_schemes"] = {"policy": weight_sync_scheme()}

collector = cls(
env,
Expand Down
Loading
Loading