diff --git a/core/src/main/java/org/infinispan/remoting/transport/impl/SingleTargetRequest.java b/core/src/main/java/org/infinispan/remoting/transport/impl/SingleTargetRequest.java index 102568564aa6..7b2d069f70a9 100644 --- a/core/src/main/java/org/infinispan/remoting/transport/impl/SingleTargetRequest.java +++ b/core/src/main/java/org/infinispan/remoting/transport/impl/SingleTargetRequest.java @@ -2,6 +2,7 @@ import java.util.Set; +import net.jcip.annotations.GuardedBy; import org.infinispan.remoting.responses.CacheNotFoundResponse; import org.infinispan.remoting.responses.Response; import org.infinispan.remoting.transport.AbstractRequest; @@ -19,7 +20,8 @@ public class SingleTargetRequest extends AbstractRequest { private static final Log log = LogFactory.getLog(SingleTargetRequest.class); - private final Address target; + // Only changes from non-null to null + private Address target; public SingleTargetRequest(ResponseCollector wrapper, long requestId, RequestRepository repository, Address target) { super(requestId, wrapper, repository); @@ -28,36 +30,48 @@ public SingleTargetRequest(ResponseCollector wrapper, long requestId, Request @Override public void onResponse(Address sender, Response response) { - if (!target.equals(sender)) { - completeExceptionally( - new IllegalStateException("Received response from " + sender + ", but target was " + target)); + try { + T result; + synchronized (responseCollector) { + if (target != null && !target.equals(sender)) { + log.tracef("Received unexpected response to request %d from %s, target is %s", requestId, sender, target); + } + + result = addResponse(sender, response); + } + complete(result); + } catch (Exception e) { + completeExceptionally(e); } - receiveResponse(sender, response); } @Override public boolean onNewView(Set
members) { - boolean targetIsMissing = !members.contains(target); - if (targetIsMissing) { - receiveResponse(target, CacheNotFoundResponse.INSTANCE); - } - return targetIsMissing; - } - - private void receiveResponse(Address sender, Response response) { try { - // Ignore the return value, we won't receive another response T result; synchronized (responseCollector) { - result = responseCollector.addResponse(sender, response); - if (result == null) { - result = responseCollector.finish(); + boolean targetIsMissing = target != null && !members.contains(target); + if (!targetIsMissing) { + return false; } + + result = addResponse(target, CacheNotFoundResponse.INSTANCE); } complete(result); } catch (Exception e) { completeExceptionally(e); } + return true; + } + + @GuardedBy("responseCollector") + private T addResponse(Address sender, Response response) { + target = null; + T result = responseCollector.addResponse(sender, response); + if (result == null) { + result = responseCollector.finish(); + } + return result; } @Override