Skip to content

Commit

Permalink
ISPN-7064 RPC to leaver times out instead of finishing immediately
Browse files Browse the repository at this point in the history
* Set the GroupRequest listener before the request is sent.
* Send the state transfer cancel commands asynchronously.
  • Loading branch information
danberindei authored and slaskawi committed Oct 11, 2016
1 parent 3a22bc3 commit 97e76d5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,11 @@ private CompletableFuture<Responses> processCalls(ReplicableCommand command, boo
opts = new RequestOptions(mode, timeout, true, filter);
}

GroupRequest<Response> request = cast(dests, msg, opts, false);
RspListFuture retval = new RspListFuture();
GroupRequest<Response> request = this.cast(dests, msg, opts, false, retval);
if (mode == ResponseMode.GET_NONE)
return null;

RspListFuture retval = new RspListFuture(request);
if (request == null) {
// cast() returns null when there no other nodes in the cluster
if (broadcast) {
Expand All @@ -431,6 +431,7 @@ private CompletableFuture<Responses> processCalls(ReplicableCommand command, boo
}
}
if (timeout > 0 && !retval.isDone()) {
retval.setRequest(request);
ScheduledFuture<?> timeoutFuture = timeoutExecutor.schedule(retval, timeout, TimeUnit.MILLISECONDS);
retval.setTimeoutFuture(timeoutFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,48 @@

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.infinispan.remoting.responses.Response;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.util.FutureListener;
import org.jgroups.util.RspList;

/**
* @author Dan Berindei
* @since 8.0
*/
public class RspListFuture extends CompletableFuture<Responses> implements FutureListener<Responses>,
public class RspListFuture extends CompletableFuture<Responses> implements FutureListener<RspList<Response>>,
Callable<Void> {
private final GroupRequest<Response> request;
private volatile GroupRequest<Response> request;
private volatile Future<?> timeoutFuture = null;

RspListFuture(GroupRequest<Response> request) {
RspListFuture() {
}

/**
* Add a reference to the request.
*
* Must be called before scheduling the timeout task.
*/
public void setRequest(GroupRequest<Response> request) {
this.request = request;
if (request != null) {
request.setListener(this);
}
}

@Override
public void futureDone(Future<Responses> future) {
complete(new Responses(request.getResults()));
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
public void futureDone(Future<RspList<Response>> future) {
// The request field may not be set at this time
// The future may be a
RspList<Response> rspList;
try {
rspList = future.get();
complete(new Responses(rspList));
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
} catch (InterruptedException | ExecutionException e) {
completeExceptionally(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private void sendCancelCommand(Set<Integer> cancelledSegments) {
StateRequestCommand.Type.CANCEL_STATE_TRANSFER, rpcManager.getAddress(), topologyId,
cancelledSegments);
try {
rpcManager.invokeRemotely(Collections.singleton(source), cmd, rpcOptions);
rpcManager.invokeRemotely(Collections.singleton(source), cmd, rpcManager.getDefaultRpcOptions(false));
} catch (Exception e) {
// Ignore exceptions here, the worst that can happen is that the provider will send some extra state
log.debugf("Caught an exception while cancelling state transfer for segments %s from %s",
Expand Down

0 comments on commit 97e76d5

Please sign in to comment.