New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISPN-9128 RehashWithSharedStoreTest.testRehashes random failures #5989
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,20 @@ | ||
package org.infinispan.remoting.transport; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import static org.infinispan.test.TestingUtil.wrapGlobalComponent; | ||
import static org.testng.AssertJUnit.assertTrue; | ||
|
||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Stream; | ||
|
||
import org.infinispan.Cache; | ||
import org.infinispan.commands.ReplicableCommand; | ||
import org.infinispan.commands.remote.SingleRpcCommand; | ||
import org.infinispan.remoting.responses.Response; | ||
import org.infinispan.util.concurrent.ReclosableLatch; | ||
import org.infinispan.util.logging.Log; | ||
import org.infinispan.util.logging.LogFactory; | ||
import org.infinispan.xsite.XSiteReplicateCommand; | ||
|
||
/** | ||
|
@@ -21,63 +25,81 @@ | |
* @since 7.0 | ||
*/ | ||
public class ControlledTransport extends AbstractDelegatingTransport { | ||
private static final Log log = LogFactory.getLog(ControlledTransport.class); | ||
private static final Predicate<ReplicableCommand> NEVER = cmd -> false; | ||
|
||
private final ReclosableLatch replicationLatch = new ReclosableLatch(true); | ||
private final ReclosableLatch blockingLatch = new ReclosableLatch(true); | ||
private volatile Set<Class> blockBeforeFilter = Collections.emptySet(); | ||
private volatile Set<Class> blockAfterFilter = Collections.emptySet(); | ||
private volatile Set<Class> failFilter = Collections.emptySet(); | ||
private volatile Predicate<ReplicableCommand> blockBeforeFilter = NEVER; | ||
private volatile Predicate<ReplicableCommand> blockAfterFilter = NEVER; | ||
private volatile Predicate<ReplicableCommand> failFilter = NEVER; | ||
|
||
public ControlledTransport(Transport realOne) { | ||
private ControlledTransport(Transport realOne) { | ||
super(realOne); | ||
} | ||
|
||
public static ControlledTransport replace(Cache<?, ?> cache) { | ||
return wrapGlobalComponent(cache.getCacheManager(), Transport.class, ControlledTransport::new, true); | ||
} | ||
|
||
@Override | ||
public void start() { | ||
//skip start it again. | ||
} | ||
|
||
public void failFor(Class... filter) { | ||
this.failFilter = new HashSet<>(Arrays.asList(filter)); | ||
failFor(classListToFilter(filter)); | ||
} | ||
|
||
private void failFor(Predicate<ReplicableCommand> filter) { | ||
this.failFilter = filter; | ||
blockingLatch.open(); | ||
} | ||
|
||
public void stopFailing() { | ||
this.failFilter = Collections.emptySet(); | ||
this.failFilter = NEVER; | ||
blockingLatch.open(); | ||
} | ||
|
||
public void blockBefore(Class... filter) { | ||
this.blockBeforeFilter = new HashSet<>(Arrays.asList(filter)); | ||
blockBefore(classListToFilter(filter)); | ||
} | ||
|
||
public void blockBefore(Predicate<ReplicableCommand> filter) { | ||
this.blockBeforeFilter = filter; | ||
replicationLatch.close(); | ||
blockingLatch.close(); | ||
} | ||
|
||
public void blockAfter(Class... filter) { | ||
this.blockAfterFilter = new HashSet<>(Arrays.asList(filter)); | ||
blockAfter(classListToFilter(filter)); | ||
} | ||
|
||
public void blockAfter(Predicate<ReplicableCommand> filter) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without any further info on what caused the latest failures, I'd say it's this... the more complex the filter gets, the more likely it is to exclude the wrong command and crash the whole Rube Goldberg machine. True, it's painful to enumerate all the commands that state transfer now needs like I'm trying to do with |
||
this.blockAfterFilter = filter; | ||
replicationLatch.close(); | ||
blockingLatch.close(); | ||
} | ||
|
||
public void stopBlocking() { | ||
getLog().tracef("Stop blocking commands"); | ||
blockBeforeFilter = Collections.emptySet(); | ||
blockAfterFilter = Collections.emptySet(); | ||
log.tracef("Stop blocking commands"); | ||
blockBeforeFilter = NEVER; | ||
blockAfterFilter = NEVER; | ||
replicationLatch.open(); | ||
blockingLatch.open(); | ||
} | ||
|
||
public void waitForCommandToBlock() throws InterruptedException { | ||
getLog().tracef("Waiting for at least one command to block"); | ||
blockingLatch.await(30, TimeUnit.SECONDS); | ||
log.tracef("Waiting for at least one command to block"); | ||
assertTrue(blockingLatch.await(30, TimeUnit.SECONDS)); | ||
} | ||
|
||
public boolean waitForCommandToBlock(long time, TimeUnit unit) throws InterruptedException { | ||
return blockingLatch.await(time, unit); | ||
} | ||
|
||
public void failIfNeeded(ReplicableCommand rpcCommand) { | ||
if (failFilter.contains(getActualClass(rpcCommand))) { | ||
if (failFilter.test(rpcCommand)) { | ||
throw new IllegalStateException("Induced failure!"); | ||
} | ||
} | ||
|
@@ -90,21 +112,21 @@ protected void waitAfter(ReplicableCommand rpcCommand) { | |
waitForReplicationLatch(rpcCommand, blockAfterFilter); | ||
} | ||
|
||
protected void waitForReplicationLatch(ReplicableCommand rpcCommand, Set<Class> filter) { | ||
Class cmdClass = getActualClass(rpcCommand); | ||
if (!filter.contains(cmdClass)) { | ||
protected void waitForReplicationLatch(ReplicableCommand rpcCommand, Predicate<ReplicableCommand> filter) { | ||
if (!filter.test(rpcCommand)) { | ||
log.tracef("Not blocking command %s", rpcCommand); | ||
return; | ||
} | ||
|
||
try { | ||
if (!blockingLatch.isOpened()) { | ||
getLog().debugf("Replication trigger called, releasing any waiters for command to block."); | ||
log.debugf("Replication trigger called, releasing any waiters for command to block."); | ||
blockingLatch.open(); | ||
} | ||
|
||
getLog().debugf("Replication trigger called, waiting for latch to open."); | ||
replicationLatch.await(30, TimeUnit.SECONDS); | ||
getLog().trace("Replication latch opened, continuing."); | ||
log.debugf("Replication trigger called, waiting for latch to open."); | ||
assertTrue(replicationLatch.await(30, TimeUnit.SECONDS)); | ||
log.trace("Replication latch opened, continuing."); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Unexpected exception!", e); | ||
} | ||
|
@@ -134,6 +156,13 @@ protected BackupResponse afterBackupRemotely(ReplicableCommand command, BackupRe | |
return response; | ||
} | ||
|
||
private Predicate<ReplicableCommand> classListToFilter(Class[] filter) { | ||
return cmd -> { | ||
Class<?> actualClass = getActualClass(cmd); | ||
return Stream.of(filter).anyMatch(clazz -> clazz.isAssignableFrom(actualClass)); | ||
}; | ||
} | ||
|
||
private Class getActualClass(ReplicableCommand rpcCommand) { | ||
Class cmdClass = rpcCommand.getClass(); | ||
if (cmdClass.equals(SingleRpcCommand.class)) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant this method to be a mirror of
CacheTopology.getReadConsistentHash()
, only without using theunionCH
field (which isnull
on theClusterTopologyManagerImpl
side). Since this is breaking the symmetry, we should change the method name as well.