Skip to content

Commit

Permalink
ISPN-2154 Use anycast instead of FutureCollator
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei committed Jun 1, 2015
1 parent 0b302fc commit 430957b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.partitionhandling.AvailabilityException;
import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected final boolean parseResponseAndAddToResponseList(Object responseObject,
invalidResponse = false;
if (exception != null) {
log.tracef(exception, "Unexpected exception from %s", sender);
throw new CacheException("Remote (" + sender + ") failed unexpectedly", exception);
throw log.remoteException(sender, exception);
}

if (checkResponse(responseObject, sender)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.jgroups.Message;
import org.jgroups.SuspectedException;
import org.jgroups.UpHandler;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RequestCorrelator;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.blocks.RequestOptions;
Expand All @@ -62,6 +63,7 @@
* @since 4.0
*/
public class CommandAwareRpcDispatcher extends RpcDispatcher {
public static final RspList<Object> EMPTY_RESPONSES_LIST = new RspList<>();

private final ExecutorService asyncExecutor;
private static final Log log = LogFactory.getLog(CommandAwareRpcDispatcher.class);
Expand Down Expand Up @@ -130,7 +132,7 @@ public RspList<Object> invokeRemoteCommands(final List<Address> recipients, fina
}

/**
* @param recipients Guaranteed not to be null. Must <b>not</b> contain self.
* @param recipients Must <b>not</b> contain self.
*/
public RspList<Object> invokeRemoteCommands(final List<Address> recipients, final ReplicableCommand command,
final ResponseMode mode, final long timeout, final RspFilter filter,
Expand Down Expand Up @@ -244,7 +246,7 @@ public RspList<Object> broadcastRemoteCommands(ReplicableCommand command, Respon

private boolean containsOnlyNulls(RspList<Object> l) {
for (Rsp<Object> r : l.values()) {
if (r.getValue() != null || !r.wasReceived() || r.wasSuspected()) return false;
if (r.getValue() != null || r.hasException() || !r.wasReceived() || r.wasSuspected()) return false;
}
return true;
}
Expand Down Expand Up @@ -430,58 +432,31 @@ private static RspList<Object> processCalls(ReplicableCommand command, boolean b

//Only the commands in total order must be received...
//For correctness, ispn doesn't need their own message, so add own address to exclusion list
opts.setExclusionList(card.getChannel().getAddress());
opts.setExclusionList(card.local_addr);

retval = card.castMessage(dests, constructMessage(buf, null, mode, rsvp, deliverOrder),opts);
Message message = constructMessage(buf, null, mode, rsvp, deliverOrder);
retval = card.castMessage(dests, message,opts);
} else {
RequestOptions opts = new RequestOptions(mode, timeout);
RequestOptions opts = new RequestOptions(mode, timeout, true, filter);

//Only the commands in total order must be received...
opts.setExclusionList(card.getChannel().getAddress());
//For correctness, ispn doesn't need their own message, so remove it from the dests collection
if (dests.contains(card.local_addr)) {
throw new IllegalArgumentException("Local address is not allowed in the recipients list at this point");
}

if (dests.isEmpty()) return new RspList<>();
if (dests.isEmpty()) return EMPTY_RESPONSES_LIST;
buf = marshallCall(marshaller, command);
Message msg = constructMessage(buf, null, mode, rsvp, deliverOrder);

// if at all possible, try not to use JGroups' ANYCAST for now. Multiple (parallel) UNICASTs are much faster.
if (filter != null) {
// This is possibly a remote GET.
// These UNICASTs happen in parallel using sendMessageWithFuture. Each future has a listener attached
// (see FutureCollator) and the first successful response is used.
FutureCollator futureCollator = new FutureCollator(filter, dests.size(), timeout, card.timeService);
for (Address a : dests) {
NotifyingFuture<Object> f = card.sendMessageWithFuture(constructMessage(buf, a, mode, rsvp, deliverOrder), opts);
futureCollator.watchFuture(f, a);
}
retval = futureCollator.getResponseList();
} else if (mode == ResponseMode.GET_ALL) {
// A SYNC call that needs to go everywhere
Map<Address, Future<Object>> futures = new HashMap<>(dests.size());

for (Address dest : dests)
futures.put(dest, card.sendMessageWithFuture(constructMessage(buf, dest, mode, rsvp, deliverOrder), opts));

retval = new RspList<>();

// a get() on each future will block till that call completes.
for (Map.Entry<Address, Future<Object>> entry : futures.entrySet()) {
Address target = entry.getKey();
try {
retval.addRsp(target, entry.getValue().get(timeout, MILLISECONDS));
} catch (java.util.concurrent.TimeoutException te) {
throw new TimeoutException(formatString("Timed out after %s waiting for a response from %s",
prettyPrintTime(timeout), target));
} catch (ExecutionException e) {
if (ignoreLeavers && e.getCause() instanceof SuspectedException) {
log.tracef(formatString("Ignoring node %s that left during the remote call", target));
retval.addRsp(target, CacheNotFoundResponse.INSTANCE);
} else {
throw wrapThrowableInException(e.getCause());
}
}
}
} else if (mode == ResponseMode.GET_NONE) {
if (mode != ResponseMode.GET_NONE) {
// A SYNC call that needs to go everywhere (with or without a filter)
GroupRequest<Object> request = card.cast(dests, msg, opts, true);
retval = request != null ? request.get() : EMPTY_RESPONSES_LIST;
} else {
// An ASYNC call. We don't care about responses.
for (Address dest : dests) card.sendMessage(constructMessage(buf, dest, mode, rsvp, deliverOrder), opts);
card.cast(dests, msg, opts, false);
retval = EMPTY_RESPONSES_LIST;
}
}

Expand All @@ -506,7 +481,7 @@ private static RspList<Object> processCalls(Map<Address, ReplicableCommand> comm
boolean oob, boolean ignoreLeavers) throws Exception {
if (trace) log.tracef("Replication task sending %s with response mode %s", commands, mode);

if (commands.isEmpty()) return new RspList<>();
if (commands.isEmpty()) return EMPTY_RESPONSES_LIST;

RequestOptions opts = new RequestOptions(mode, timeout);
//opts.setExclusionList(card.getChannel().getAddress());
Expand Down Expand Up @@ -554,122 +529,5 @@ private static boolean isRsvpCommand(ReplicableCommand command) {
return command instanceof FlagAffectedCommand
&& ((FlagAffectedCommand) command).hasFlag(Flag.GUARANTEED_DELIVERY);
}

static class SenderContainer {
final Address address;
volatile boolean processed = false;

SenderContainer(Address address) {
this.address = address;
}

@Override
public String toString() {
return "Sender{" +
"address=" + address +
", responded=" + processed +
'}';
}
}

final static class FutureCollator implements FutureListener<Object> {
final RspFilter filter;
final Map<Future<Object>, SenderContainer> futures = new HashMap<>(4);
final long timeout;
@GuardedBy("this")
private RspList<Object> retval;
@GuardedBy("this")
private Exception exception;
@GuardedBy("this")
private int expectedResponses;
private final TimeService timeService;

FutureCollator(RspFilter filter, int expectedResponses, long timeout, TimeService timeService) {
this.filter = filter;
this.expectedResponses = expectedResponses;
this.timeout = timeout;
this.timeService = timeService;
}

public void watchFuture(NotifyingFuture<Object> f, Address address) {
futures.put(f, new SenderContainer(address));
f.setListener(this);
}

public synchronized RspList<Object> getResponseList() throws Exception {
long expectedEndTime = timeService.expectedEndTime(timeout, MILLISECONDS);
long waitingTime;
while (expectedResponses > 0 && retval == null &&
(waitingTime = timeService.remainingTime(expectedEndTime, MILLISECONDS)) > 0) {
try {
this.wait(waitingTime);
} catch (InterruptedException e) {
// reset interruption flag
Thread.currentThread().interrupt();
expectedResponses = -1;
}
}
// Now we either have the response we need or aren't expecting any more responses - or have run out of time.
if (retval != null)
return retval;
else if (exception != null)
throw exception;
else if (expectedResponses == 0)
throw new RpcException(format("No more valid responses. Received invalid responses from all of %s", futures.values()));
else
throw new TimeoutException(format("Timed out waiting for %s for valid responses from any of %s.", Util.prettyPrintTime(timeout), futures.values()));
}

@Override
@SuppressWarnings("unchecked")
public synchronized void futureDone(Future<Object> objectFuture) {
SenderContainer sc = futures.get(objectFuture);
if (sc.processed) {
// This can happen - it is a race condition in JGroups' NotifyingFuture.setListener() where a listener
// could be notified twice.
if (trace) log.tracef("Not processing callback; already processed callback for sender %s", sc.address);
} else {
sc.processed = true;
Address sender = sc.address;
boolean done = false;
try {
if (retval == null) {
Object response = objectFuture.get();
if (trace) log.tracef("Received response: %s from %s", response, sender);
filter.isAcceptable(response, sender);
if (!filter.needMoreResponses()) {
retval = new RspList(Collections.singleton(new Rsp(sender, response)));
done = true;
//TODO cancel other tasks?
}
} else {
if (trace) log.tracef("Skipping response from %s since a valid response for this request has already been received", sender);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof org.jgroups.SuspectedException) {
// Do not set the exception field, RpcException should be thrown if there is no other valid response
return;
} else if (cause instanceof org.jgroups.TimeoutException) {
exception = new TimeoutException("Timeout!", e);
} else if (cause instanceof Exception) {
exception = (Exception) cause;
} else {
exception = new CacheException("Caught a throwable", cause);
}

if (log.isDebugEnabled())
log.debugf("Caught exception from sender %s: %s", sender, exception);
} finally {
expectedResponses--;
if (expectedResponses == 0 || done) {
this.notify(); //make sure to awake waiting thread, but avoid unnecessary wakeups!
}
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.jmx.JmxUtil;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.remoting.RpcException;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.InboundInvocationHandler;
import org.infinispan.remoting.responses.ExceptionResponse;
Expand Down Expand Up @@ -586,8 +587,24 @@ public Map<Address, Response> invokeRemotely(Collection<Address> recipients, Rep
responseFilter != null, ignoreLeavers);
}

if (noValidResponses)
throw new TimeoutException("Timed out waiting for valid responses!");
if (noValidResponses) {
// PartitionHandlingInterceptor relies on receiving a RpcException if there are only invalid responses
// But we still need to throw a TimeoutException if there are no responses at all.
boolean throwRpcException = false;
if (responseFilter != null) {
for (Rsp<Object> rsp : rsps.values()) {
if (rsp.wasReceived()) {
throwRpcException = true;
break;
}
}
}
if (throwRpcException) {
throw new RpcException(String.format("Received invalid responses from all of %s", recipients));
} else {
throw new TimeoutException("Timed out waiting for valid responses!");
}
}
responses = retval;
}
return responses;
Expand Down Expand Up @@ -685,9 +702,8 @@ private static org.jgroups.blocks.ResponseMode toJGroupsMode(ResponseMode mode)
return org.jgroups.blocks.ResponseMode.GET_NONE;
case SYNCHRONOUS:
case SYNCHRONOUS_IGNORE_LEAVERS:
return org.jgroups.blocks.ResponseMode.GET_ALL;
case WAIT_FOR_VALID_RESPONSE:
return org.jgroups.blocks.ResponseMode.GET_FIRST;
return org.jgroups.blocks.ResponseMode.GET_ALL;
}
throw new CacheException("Unknown response mode " + mode);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/infinispan/util/logging/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ void asyncStoreShutdownTimeoutTooHigh(long configuredAsyncStopTimeout,
void entriesMigrated(long count, String name, String prettyTime);

@Message(value = "Received exception from %s, see cause for remote stack trace", id = 217)
RemoteException remoteException(Address sender, @Cause Exception e);
RemoteException remoteException(Address sender, @Cause Throwable t);

@LogMessage(level = INFO)
@Message(value = "Timeout while waiting for the transaction validation. The command will not be processed. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.infinispan.commons.marshall.NotSerializableException;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.RemoteException;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.IsolationLevel;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void testClearOnKeyOwner() {
ownerCache.clear();
}

public void testRetrieveNonSerializableKeyFromNonOwner() {
public void testRetrieveNonSerializableValueFromNonOwner() {
Cache[] owners = getOwners("yourkey", 1);
Cache[] nonOwners = getNonOwners("yourkey", 1);
assert owners.length == 1;
Expand All @@ -79,7 +80,8 @@ public void testRetrieveNonSerializableKeyFromNonOwner() {
try {
nonOwnerCache.get("yourkey");
fail("Should have failed with a org.infinispan.marshall.NotSerializableException");
} catch (NotSerializableException e) {
} catch (RemoteException e) {
assertTrue(e.getCause() instanceof NotSerializableException);
}
}

Expand Down

0 comments on commit 430957b

Please sign in to comment.