Skip to content

Commit

Permalink
ISPN-9128 RehashWithSharedStoreTest.testRehashes random failures
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa committed Jul 13, 2018
1 parent 0d143a6 commit 21617ac
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 67 deletions.
Expand Up @@ -26,13 +26,17 @@
public interface AvailabilityStrategy {
/**
* Compute the read consistent hash for a topology with a {@code null} union consistent hash.
* Originally a copy of {@link CacheTopology#getReadConsistentHash()} but differs in case of scattered cache.
*/
static ConsistentHash readConsistentHash(CacheTopology topology, ConsistentHashFactory chFactory) {
static ConsistentHash ownersConsistentHash(CacheTopology topology, ConsistentHashFactory chFactory) {
switch (topology.getPhase()) {
case NO_REBALANCE:
return topology.getCurrentCH();
case TRANSITORY:
return topology.getPendingCH();
// This is used to determine nodes that own the entries. In scattered cache (which uses transitory topology)
// the pendingCH is used for reading but the nodes in there are not guaranteed to have the data yet.
// CurrentCH should be safe - the nodes either have the data or the owner is unknown.
return topology.getCurrentCH();
case CONFLICT_RESOLUTION:
case READ_OLD_WRITE_ALL:
return topology.getCurrentCH();
Expand Down
@@ -1,6 +1,6 @@
package org.infinispan.partitionhandling.impl;

import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.readConsistentHash;
import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.ownersConsistentHash;
import static org.infinispan.util.logging.events.Messages.MESSAGES;

import java.util.ArrayList;
Expand Down Expand Up @@ -262,7 +262,7 @@ private List<Partition> computePartitions(Map<Address, CacheStatusResponse> stat
// The node hasn't properly joined yet, so it can't be part of a partition
continue;
}
ConsistentHash readCH = readConsistentHash(topology, response.getCacheJoinInfo().getConsistentHashFactory());
ConsistentHash readCH = ownersConsistentHash(topology, response.getCacheJoinInfo().getConsistentHashFactory());
Partition p = new Partition(sender, topology, response.getStableTopology(), readCH);
partitions.add(p);
}
Expand Down
@@ -1,6 +1,6 @@
package org.infinispan.partitionhandling.impl;

import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.readConsistentHash;
import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.ownersConsistentHash;
import static org.infinispan.util.logging.events.Messages.MESSAGES;

import java.util.ArrayList;
Expand Down Expand Up @@ -208,13 +208,13 @@ public void onPartitionMerge(AvailabilityStrategyContext context, Map<Address, C
for (CacheStatusResponse response : statusResponseMap.values()) {
CacheTopology cacheTopology = response.getCacheTopology();
if (cacheTopology != null) {
ConsistentHash readCH = readConsistentHash(cacheTopology, joinInfo.getConsistentHashFactory());
ConsistentHash readCH = ownersConsistentHash(cacheTopology, joinInfo.getConsistentHashFactory());
if (readCH != null && !readCH.getMembers().isEmpty()) {
distinctHashes.add(readCH);
}
}
}
ConsistentHash preferredHash = readConsistentHash(mergedTopology, joinInfo.getConsistentHashFactory());
ConsistentHash preferredHash = ownersConsistentHash(mergedTopology, joinInfo.getConsistentHashFactory());
ConsistentHash conflictHash = context.calculateConflictHash(preferredHash, distinctHashes, expectedMembers);

mergedTopology = new CacheTopology(++maxTopologyId, maxRebalanceId + 1, conflictHash, null,
Expand Down
Expand Up @@ -499,6 +499,11 @@ private void applyValues(Address address, List<Object> keys, Response response)
for (int i = 0; i < keys.size(); ++i) {
Object key = keys.get(i);
InternalCacheValue icv = values[i];
if (icv == null) {
// The entry got lost in the meantime - this can happen when the container is cleared concurrently to processing
// the GetAllCommand. We'll just avoid NPEs here: data is lost as > 1 nodes have left.
continue;
}
PutKeyValueCommand put = commandsFactory.buildPutKeyValueCommand(key, icv.getValue(),
keyPartitioner.getSegment(key), icv.getMetadata(), STATE_TRANSFER_FLAGS);
try {
Expand Down
@@ -1,16 +1,20 @@
package org.infinispan.remoting.transport;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import static org.infinispan.test.TestingUtil.wrapGlobalComponent;
import static org.testng.AssertJUnit.assertTrue;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.remoting.responses.Response;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteReplicateCommand;

/**
Expand All @@ -21,63 +25,81 @@
* @since 7.0
*/
public class ControlledTransport extends AbstractDelegatingTransport {
private static final Log log = LogFactory.getLog(ControlledTransport.class);
private static final Predicate<ReplicableCommand> NEVER = cmd -> false;

private final ReclosableLatch replicationLatch = new ReclosableLatch(true);
private final ReclosableLatch blockingLatch = new ReclosableLatch(true);
private volatile Set<Class> blockBeforeFilter = Collections.emptySet();
private volatile Set<Class> blockAfterFilter = Collections.emptySet();
private volatile Set<Class> failFilter = Collections.emptySet();
private volatile Predicate<ReplicableCommand> blockBeforeFilter = NEVER;
private volatile Predicate<ReplicableCommand> blockAfterFilter = NEVER;
private volatile Predicate<ReplicableCommand> failFilter = NEVER;

public ControlledTransport(Transport realOne) {
private ControlledTransport(Transport realOne) {
super(realOne);
}

public static ControlledTransport replace(Cache<?, ?> cache) {
return wrapGlobalComponent(cache.getCacheManager(), Transport.class, ControlledTransport::new, true);
}

@Override
public void start() {
//skip start it again.
}

public void failFor(Class... filter) {
this.failFilter = new HashSet<>(Arrays.asList(filter));
failFor(classListToFilter(filter));
}

private void failFor(Predicate<ReplicableCommand> filter) {
this.failFilter = filter;
blockingLatch.open();
}

public void stopFailing() {
this.failFilter = Collections.emptySet();
this.failFilter = NEVER;
blockingLatch.open();
}

public void blockBefore(Class... filter) {
this.blockBeforeFilter = new HashSet<>(Arrays.asList(filter));
blockBefore(classListToFilter(filter));
}

public void blockBefore(Predicate<ReplicableCommand> filter) {
this.blockBeforeFilter = filter;
replicationLatch.close();
blockingLatch.close();
}

public void blockAfter(Class... filter) {
this.blockAfterFilter = new HashSet<>(Arrays.asList(filter));
blockAfter(classListToFilter(filter));
}

public void blockAfter(Predicate<ReplicableCommand> filter) {
this.blockAfterFilter = filter;
replicationLatch.close();
blockingLatch.close();
}

public void stopBlocking() {
getLog().tracef("Stop blocking commands");
blockBeforeFilter = Collections.emptySet();
blockAfterFilter = Collections.emptySet();
log.tracef("Stop blocking commands");
blockBeforeFilter = NEVER;
blockAfterFilter = NEVER;
replicationLatch.open();
blockingLatch.open();
}

public void waitForCommandToBlock() throws InterruptedException {
getLog().tracef("Waiting for at least one command to block");
blockingLatch.await(30, TimeUnit.SECONDS);
log.tracef("Waiting for at least one command to block");
assertTrue(blockingLatch.await(30, TimeUnit.SECONDS));
}

public boolean waitForCommandToBlock(long time, TimeUnit unit) throws InterruptedException {
return blockingLatch.await(time, unit);
}

public void failIfNeeded(ReplicableCommand rpcCommand) {
if (failFilter.contains(getActualClass(rpcCommand))) {
if (failFilter.test(rpcCommand)) {
throw new IllegalStateException("Induced failure!");
}
}
Expand All @@ -90,21 +112,21 @@ protected void waitAfter(ReplicableCommand rpcCommand) {
waitForReplicationLatch(rpcCommand, blockAfterFilter);
}

protected void waitForReplicationLatch(ReplicableCommand rpcCommand, Set<Class> filter) {
Class cmdClass = getActualClass(rpcCommand);
if (!filter.contains(cmdClass)) {
protected void waitForReplicationLatch(ReplicableCommand rpcCommand, Predicate<ReplicableCommand> filter) {
if (!filter.test(rpcCommand)) {
log.tracef("Not blocking command %s", rpcCommand);
return;
}

try {
if (!blockingLatch.isOpened()) {
getLog().debugf("Replication trigger called, releasing any waiters for command to block.");
log.debugf("Replication trigger called, releasing any waiters for command to block.");
blockingLatch.open();
}

getLog().debugf("Replication trigger called, waiting for latch to open.");
replicationLatch.await(30, TimeUnit.SECONDS);
getLog().trace("Replication latch opened, continuing.");
log.debugf("Replication trigger called, waiting for latch to open.");
assertTrue(replicationLatch.await(30, TimeUnit.SECONDS));
log.trace("Replication latch opened, continuing.");
} catch (Exception e) {
throw new RuntimeException("Unexpected exception!", e);
}
Expand Down Expand Up @@ -134,6 +156,13 @@ protected BackupResponse afterBackupRemotely(ReplicableCommand command, BackupRe
return response;
}

private Predicate<ReplicableCommand> classListToFilter(Class[] filter) {
return cmd -> {
Class<?> actualClass = getActualClass(cmd);
return Stream.of(filter).anyMatch(clazz -> clazz.isAssignableFrom(actualClass));
};
}

private Class getActualClass(ReplicableCommand rpcCommand) {
Class cmdClass = rpcCommand.getClass();
if (cmdClass.equals(SingleRpcCommand.class)) {
Expand Down

0 comments on commit 21617ac

Please sign in to comment.