Skip to content

Commit

Permalink
ISPN-9099 StaggeredRequest throws IllegalStateException
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei authored and ryanemerson committed Apr 24, 2018
1 parent 20938b4 commit e4e4f7b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
Expand Up @@ -83,9 +83,13 @@ void sendNextMessage() {
}

if (target == null) {
// If the final targets were removed because they have left the cluster,
// onResponse() should have already completed the request.
throw new IllegalStateException("Request should have been completed already.");
// The final targets were removed because they have left the cluster,
// but the request is not yet complete because we're still waiting for a response
// from one of the other targets (i.e. we are being called from onTimeout).
// We don't need to send another message, just wait for the real timeout to expire.
long delayNanos = transport.getTimeService().remainingTime(deadline, TimeUnit.NANOSECONDS);
super.setTimeout(transport.getTimeoutExecutor(), delayNanos, TimeUnit.NANOSECONDS);
return;
}

isFinalTarget = targetIndex >= getTargetsSize();
Expand Down
Expand Up @@ -2,21 +2,29 @@

import static org.testng.AssertJUnit.assertEquals;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.InboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ByteString;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.jgroups.util.UUID;
import org.testng.annotations.Test;

Expand All @@ -26,6 +34,9 @@
*/
@Test
public class JGroupsTransportTest extends MultipleCacheManagersTest {

public static final ByteString CACHE_NAME = ByteString.fromString("cache");

@Override
protected void createCacheManagers() throws Throwable {
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
Expand All @@ -39,11 +50,77 @@ public void testSynchronousIgnoreLeaversInvocationToNonMembers() throws Exceptio

JGroupsTransport transport = (JGroupsTransport) manager(0).getTransport();
long initialMessages = transport.getChannel().getSentMessages();
ReplicableCommand command = new ClusteredGetCommand("key", ByteString.fromString("cache"), 0);
ReplicableCommand command = new ClusteredGetCommand("key", CACHE_NAME, 0);
CompletableFuture<Map<Address, Response>> future = transport
.invokeRemotelyAsync(Collections.singletonList(randomAddress), command,
ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, 1, null, DeliverOrder.NONE, true);
assertEquals(CacheNotFoundResponse.INSTANCE, future.get().get(randomAddress));
assertEquals(initialMessages, transport.getChannel().getSentMessages());
}

public void testInvokeCommandStaggeredToNonMember() throws Exception {
UUID randomUuid = UUID.randomUUID();
Address randomAddress = JGroupsAddressCache.fromJGroupsAddress(randomUuid);

// Send message only to non-member
JGroupsTransport transport = (JGroupsTransport) manager(0).getTransport();
ReplicableCommand command = new ClusteredGetCommand("key", ByteString.fromString("cache"), 0);
CompletionStage<Map<Address, Response>> future =
transport.invokeCommandStaggered(Collections.singletonList(randomAddress), command,
MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5,
TimeUnit.SECONDS);
assertEquals(Collections.singletonMap(randomAddress, CacheNotFoundResponse.INSTANCE),
future.toCompletableFuture().get());

// Send message to view member that doesn't have the cache and to non-member
CompletionStage<Map<Address, Response>> future2 =
transport.invokeCommandStaggered(Arrays.asList(address(1), randomAddress), command,
MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5,
TimeUnit.SECONDS);
Map<Object, Object> expected = TestingUtil.mapOf(address(1), CacheNotFoundResponse.INSTANCE,
randomAddress, CacheNotFoundResponse.INSTANCE);
assertEquals(expected, future2.toCompletableFuture().get());

// Send message to view member that doesn't have the cache and to non-member
// and block the response from the view member
CompletableFuture<Void> blocker = blockRemoteGets();
try {
CompletionStage<Map<Address, Response>> future3 =
transport.invokeCommandStaggered(Arrays.asList(address(1), randomAddress), command,
MapResponseCollector.ignoreLeavers(), DeliverOrder.NONE, 5,
TimeUnit.SECONDS);
// Wait for the stagger timeout (5s / 10 / 2) to expire before sending a reply back
Thread.sleep(500);
blocker.complete(null);
assertEquals(expected, future3.toCompletableFuture().get());
} finally {
blocker.complete(null);
}
}

private CompletableFuture<Void> blockRemoteGets() {
CompletableFuture<Void> blocker = new CompletableFuture<>();
InboundInvocationHandler oldInvocationHandler = TestingUtil.extractGlobalComponent(manager(1),
InboundInvocationHandler
.class);
InboundInvocationHandler blockingInvocationHandler = new InboundInvocationHandler() {
@Override
public void handleFromCluster(Address origin, ReplicableCommand command, Reply reply, DeliverOrder order) {
if (command instanceof ClusteredGetCommand) {
log.tracef("Blocking clustered get");
blocker.thenRun(() -> oldInvocationHandler.handleFromCluster(origin, command, reply, order));
} else {
oldInvocationHandler.handleFromCluster(origin, command, reply, order);
}
}

@Override
public void handleFromRemoteSite(String origin, XSiteReplicateCommand command, Reply reply,
DeliverOrder order) {
oldInvocationHandler.handleFromRemoteSite(origin, command, reply, order);
}
};
TestingUtil.replaceComponent(manager(1), InboundInvocationHandler.class, blockingInvocationHandler, true);
return blocker;
}
}

0 comments on commit e4e4f7b

Please sign in to comment.