Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-3315 Retry remote get after topology change if all the targets are ... #2204

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,8 +23,10 @@
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.rpc.RpcOptionsBuilder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
Expand Down Expand Up @@ -53,6 +55,7 @@ public abstract class BaseDistributionInterceptor extends ClusteringInterceptor
protected RemoteValueRetrievedListener rvrl;

private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class);
private static final boolean trace = log.isTraceEnabled();

@Override
protected Log getLog() {
Expand All @@ -73,24 +76,70 @@ protected final InternalCacheEntry retrieveFromRemoteSource(Object key, Invocati
ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx);
get.setWrite(isWrite);

List<Address> targets = new ArrayList<Address>(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key));
// if any of the recipients has left the cluster since the command was issued, just don't wait for its response
targets.retainAll(rpcManager.getTransport().getMembers());

RpcOptionsBuilder rpcOptionsBuilder = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false);
int lastTopologyId = -1;
InternalCacheEntry value = null;
while (value == null) {
final CacheTopology cacheTopology = stateTransferManager.getCacheTopology();
final int currentTopologyId = cacheTopology.getTopologyId();

if (trace) {
log.tracef("Perform remote get for key %s. topologyId=%s, currentTopologyId=%s",
key, lastTopologyId, currentTopologyId);
}
List<Address> targets;
if (lastTopologyId < currentTopologyId) {
// Cache topology has changed or it is the first time.
lastTopologyId = currentTopologyId;
targets = new ArrayList<Address>(cacheTopology.getReadConsistentHash().locateOwners(key));
} else if (lastTopologyId == currentTopologyId) {
// Same topologyId, but the owners could have already installed the next topology
// Lets try with write consistent owners (the read owners in the next topology)
lastTopologyId = currentTopologyId + 1;
targets = new ArrayList<Address>(cacheTopology.getWriteConsistentHash().locateOwners(key));
// Remove already contacted nodes
targets.removeAll(cacheTopology.getReadConsistentHash().locateOwners(key));
if (targets.isEmpty()) {
if (trace) {
log.tracef("No valid values found for key '%s' (topologyId=%s).", key, currentTopologyId);
}
break;
}
} else { // lastTopologyId > currentTopologyId
// We have not received a valid value from the write CH owners either, and the topology id hasn't changed
if (trace) {
log.tracef("No valid values found for key '%s' (topologyId=%s).", key, currentTopologyId);
}
break;
}

value = invokeClusterGetCommandRemotely(targets, rpcOptionsBuilder, get, key);
if (trace) {
log.tracef("Remote get of key '%s' (topologyId=%s) returns %s", key, currentTopologyId, value);
}
}
return value;
}

private InternalCacheEntry invokeClusterGetCommandRemotely(List<Address> targets, RpcOptionsBuilder rpcOptionsBuilder,
ClusteredGetCommand get, Object key) {
ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress());
RpcOptions options = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false)
.responseFilter(filter).build();
RpcOptions options = rpcOptionsBuilder.responseFilter(filter).build();
Map<Address, Response> responses = rpcManager.invokeRemotely(targets, get, options);

if (!responses.isEmpty()) {
for (Response r : responses.values()) {
if (r instanceof SuccessfulResponse) {

// The response value might be null.
SuccessfulResponse response = (SuccessfulResponse)r;
if( response.getResponseValue() == null )
return null;

InternalCacheValue cacheValue = (InternalCacheValue) response.getResponseValue();
SuccessfulResponse response = (SuccessfulResponse) r;
Object responseValue = response.getResponseValue();
if (responseValue == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think ClusteredGetCommand targets never return a SuccessfulResponse with a null value - if the value doesn't exist locally, a null Response is returned instead (see DistributedResponseGenerator).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it returns a SuccessfulResponse with null.

if (command.getCommandId() == ClusteredGetCommand.COMMAND_ID) {         
   ClusteredGetCommand clusteredGet = (ClusteredGetCommand) command;
   if (distributionManager.isAffectedByRehash(clusteredGet.getKey()))
      return UnsureResponse.INSTANCE;
   return SuccessfulResponse.create(returnValue);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, sorry :)

I wonder if it wouldn't be better to return UnsureResponse.INSTANCE if the value is null and the current topology id is different from the ClusteredGetCommand's topology id. If the value is non-null, it should mean that it's also up-to-date, so there's no need for an unsure response. If the originator got back only unsure responses, it would know to retry either on the read CH owners (if its topology also changed), or on the write CH owners (if its topology didn't change).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I remove the if (distributionManager.isAffectedByRehash(clusteredGet.getKey())) and the condition is if (returnValue == null && command.getTopologyId() != currentTopologyId)?

in the requestor side, a SuccessfulResponse with null will return immediately. right?

I think that it can work :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, @pruivo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not work very well because we don't set the topologyId in the clustered get and it breaks totally the ClusterLoader...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nice-sounding idea that doesn't really work in practice :)
I guess that means it's ready to be integrated.

continue;
}

InternalCacheValue cacheValue = (InternalCacheValue) responseValue;
InternalCacheEntry ice = cacheValue.toInternalCacheEntry(key);
if (rvrl != null) {
rvrl.remoteValueFound(ice);
Expand All @@ -99,10 +148,6 @@ protected final InternalCacheEntry retrieveFromRemoteSource(Object key, Invocati
}
}
}

// TODO If everyone returned null, and the read CH has changed, retry the remote get.
// Otherwise our get command might be processed by the old owners after they have invalidated their data
// and we'd return a null even though the key exists on
return null;
}

Expand Down