diff --git a/core/src/main/java/org/infinispan/remoting/transport/jgroups/StaggeredRequest.java b/core/src/main/java/org/infinispan/remoting/transport/jgroups/StaggeredRequest.java index 43337205e2cc..207a2059e823 100644 --- a/core/src/main/java/org/infinispan/remoting/transport/jgroups/StaggeredRequest.java +++ b/core/src/main/java/org/infinispan/remoting/transport/jgroups/StaggeredRequest.java @@ -83,9 +83,13 @@ void sendNextMessage() { } if (target == null) { - // If the final targets were removed because they have left the cluster, - // onResponse() should have already completed the request. - throw new IllegalStateException("Request should have been completed already."); + // The final targets were removed because they have left the cluster, + // but the request is not yet complete because we're still waiting for a response + // from one of the other targets (i.e. we are being called from onTimeout). + // We don't need to send another message, just wait for the real timeout to expire. + long delayNanos = transport.getTimeService().remainingTime(deadline, TimeUnit.NANOSECONDS); + super.setTimeout(transport.getTimeoutExecutor(), delayNanos, TimeUnit.NANOSECONDS); + return; } isFinalTarget = targetIndex >= getTargetsSize(); diff --git a/core/src/test/java/org/infinispan/remoting/transport/jgroups/JGroupsTransportTest.java b/core/src/test/java/org/infinispan/remoting/transport/jgroups/JGroupsTransportTest.java index aed981aae59c..b757624957db 100644 --- a/core/src/test/java/org/infinispan/remoting/transport/jgroups/JGroupsTransportTest.java +++ b/core/src/test/java/org/infinispan/remoting/transport/jgroups/JGroupsTransportTest.java @@ -2,21 +2,29 @@ import static org.testng.AssertJUnit.assertEquals; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.remote.ClusteredGetCommand; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.remoting.inboundhandler.DeliverOrder; +import org.infinispan.remoting.inboundhandler.InboundInvocationHandler; +import org.infinispan.remoting.inboundhandler.Reply; import org.infinispan.remoting.responses.CacheNotFoundResponse; import org.infinispan.remoting.responses.Response; import org.infinispan.remoting.rpc.ResponseMode; import org.infinispan.remoting.transport.Address; +import org.infinispan.remoting.transport.impl.MapResponseCollector; import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TestingUtil; import org.infinispan.util.ByteString; +import org.infinispan.xsite.XSiteReplicateCommand; import org.jgroups.util.UUID; import org.testng.annotations.Test; @@ -26,6 +34,9 @@ */ @Test public class JGroupsTransportTest extends MultipleCacheManagersTest { + + public static final ByteString CACHE_NAME = ByteString.fromString("cache"); + @Override protected void createCacheManagers() throws Throwable { ConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); @@ -39,11 +50,77 @@ public void testSynchronousIgnoreLeaversInvocationToNonMembers() throws Exceptio JGroupsTransport transport = (JGroupsTransport) manager(0).getTransport(); long initialMessages = transport.getChannel().getSentMessages(); - ReplicableCommand command = new ClusteredGetCommand("key", ByteString.fromString("cache"), 0); + ReplicableCommand command = new ClusteredGetCommand("key", CACHE_NAME, 0); CompletableFuture> future = transport .invokeRemotelyAsync(Collections.singletonList(randomAddress), command, ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, 1, null, DeliverOrder.NONE, true); assertEquals(CacheNotFoundResponse.INSTANCE, future.get().get(randomAddress)); assertEquals(initialMessages, transport.getChannel().getSentMessages()); } + + public void testInvokeCommandStaggeredToNonMember() throws Exception { + UUID randomUuid = UUID.randomUUID(); + Address randomAddress = JGroupsAddressCache.fromJGroupsAddress(randomUuid); + + // Send message only to non-member + JGroupsTransport transport = (JGroupsTransport) manager(0).getTransport(); + ReplicableCommand command = new ClusteredGetCommand("key", ByteString.fromString("cache"), 0); + CompletionStage> future = + transport.invokeCommandStaggered(Collections.singletonList(randomAddress), command, + MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5, + TimeUnit.SECONDS); + assertEquals(Collections.singletonMap(randomAddress, CacheNotFoundResponse.INSTANCE), + future.toCompletableFuture().get()); + + // Send message to view member that doesn't have the cache and to non-member + CompletionStage> future2 = + transport.invokeCommandStaggered(Arrays.asList(address(1), randomAddress), command, + MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5, + TimeUnit.SECONDS); + Map expected = TestingUtil.mapOf(address(1), CacheNotFoundResponse.INSTANCE, + randomAddress, CacheNotFoundResponse.INSTANCE); + assertEquals(expected, future2.toCompletableFuture().get()); + + // Send message to view member that doesn't have the cache and to non-member + // and block the response from the view member + CompletableFuture blocker = blockRemoteGets(); + try { + CompletionStage> future3 = + transport.invokeCommandStaggered(Arrays.asList(address(1), randomAddress), command, + MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5, + TimeUnit.SECONDS); + // Wait for the stagger timeout (5s / 10 / 2) to expire before sending a reply back + Thread.sleep(500); + blocker.complete(null); + assertEquals(expected, future3.toCompletableFuture().get()); + } finally { + blocker.complete(null); + } + } + + private CompletableFuture blockRemoteGets() { + CompletableFuture blocker = new CompletableFuture<>(); + InboundInvocationHandler oldInvocationHandler = TestingUtil.extractGlobalComponent(manager(1), + InboundInvocationHandler + .class); + InboundInvocationHandler blockingInvocationHandler = new InboundInvocationHandler() { + @Override + public void handleFromCluster(Address origin, ReplicableCommand command, Reply reply, DeliverOrder order) { + if (command instanceof ClusteredGetCommand) { + log.tracef("Blocking clustered get"); + blocker.thenRun(() -> oldInvocationHandler.handleFromCluster(origin, command, reply, order)); + } else { + oldInvocationHandler.handleFromCluster(origin, command, reply, order); + } + } + + @Override + public void handleFromRemoteSite(String origin, XSiteReplicateCommand command, Reply reply, + DeliverOrder order) { + oldInvocationHandler.handleFromRemoteSite(origin, command, reply, order); + } + }; + TestingUtil.replaceComponent(manager(1), InboundInvocationHandler.class, blockingInvocationHandler, true); + return blocker; + } }