Permalink
Browse files

ISPN-2875 Intermittent test failure: org.infinispan.statetransfer.Dis…

…tNonTxOperationsDuringStateTransferTest

Delaying execution of Transport.invokeRemotely(CacheTopologyControlCommand) on coordinator is usually enough
to delay the start of rebalance on the joiner in most cases but not in the rare case when the joiner receives a non-null
pending CH right in the initial topology returned to it from the join call. This was causing the assert to fail. A more manageable
way of delaying start of rebalance is to override the RebalancePolicy rather than the Transport. This approach ensures (at least in this test scenario) that the initial topology has a null pending CH.
  • Loading branch information...
anistor authored and Mircea Markus committed Mar 12, 2013
1 parent 6513fd0 commit edfa348d8d63b8e0ad5e05396d4cdedfe504527e
@@ -115,7 +115,7 @@ public void start() throws Exception {
configuration.clustering().stateTransfer().timeout()
);
- localTopologyManager.join(cacheName, joinInfo, new CacheTopologyHandler() {
+ CacheTopology initialTopology = localTopologyManager.join(cacheName, joinInfo, new CacheTopologyHandler() {
@Override
public void updateConsistentHash(CacheTopology cacheTopology) {
doTopologyUpdate(cacheTopology, false);
@@ -126,6 +126,10 @@ public void rebalance(CacheTopology cacheTopology) {
doTopologyUpdate(cacheTopology, true);
}
});
+
+ if (trace) {
+ log.tracef("StateTransferManager of cache %s on node %s received initial topology %s", cacheName, rpcManager.getAddress(), initialTopology);
+ }
}
/**
@@ -71,8 +71,8 @@ public void updateCacheStatus(String cacheName, ClusterCacheStatus cacheStatus)
public boolean isBalanced(ConsistentHash ch) {
int numSegments = ch.getNumSegments();
+ int actualNumOwners = Math.min(ch.getMembers().size(), ch.getNumOwners());
for (int i = 0; i < numSegments; i++) {
- int actualNumOwners = Math.min(ch.getMembers().size(), ch.getNumOwners());
if (ch.locateOwnersForSegment(i).size() != actualNumOwners) {
return false;
}
@@ -22,7 +22,6 @@
*/
package org.infinispan.statetransfer;
-import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
@@ -31,24 +30,20 @@
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.VersioningScheme;
-import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.interceptors.EntryWrappingInterceptor;
import org.infinispan.interceptors.InvocationContextInterceptor;
import org.infinispan.interceptors.VersionedEntryWrappingInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
-import org.infinispan.remoting.responses.Response;
-import org.infinispan.remoting.rpc.ResponseFilter;
-import org.infinispan.remoting.rpc.ResponseMode;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.CacheTopology;
-import org.infinispan.topology.CacheTopologyControlCommand;
+import org.infinispan.topology.ClusterCacheStatus;
+import org.infinispan.topology.DefaultRebalancePolicy;
+import org.infinispan.topology.RebalancePolicy;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
@@ -58,8 +53,6 @@
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;
-import java.util.Collection;
-import java.util.Map;
import java.util.concurrent.*;
import static org.junit.Assert.*;
@@ -84,7 +77,7 @@
private ConfigurationBuilder cacheConfigBuilder;
- private ReclosableLatch transportGate;
+ private ReclosableLatch rebalanceGate;
protected BaseOperationsDuringStateTransferTest(CacheMode cacheMode, boolean isTransactional,
boolean isOptimistic, boolean supportsConcurrentUpdates) {
@@ -116,25 +109,26 @@ protected void createCacheManagers() {
cacheConfigBuilder.locking().supportsConcurrentUpdates(supportsConcurrentUpdates);
cacheConfigBuilder.clustering().stateTransfer().fetchInMemoryState(true).awaitInitialTransfer(false);
- transportGate = new ReclosableLatch(true);
- GlobalConfigurationBuilder globalConfigurationBuilder = new GlobalConfigurationBuilder();
- globalConfigurationBuilder.transport().transport(new JGroupsTransport() {
- @Override
- public Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout,
- boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
- if (rpcCommand instanceof CacheTopologyControlCommand) {
- try {
- transportGate.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- return super.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter);
- }
- });
+ rebalanceGate = new ReclosableLatch(true);
- addClusterEnabledCacheManager(globalConfigurationBuilder, cacheConfigBuilder);
+ addClusterEnabledCacheManager(cacheConfigBuilder);
waitForClusterToForm();
+
+ TestingUtil.replaceComponent(manager(0), RebalancePolicy.class,
+ new DefaultRebalancePolicy() {
+ @Override
+ public void updateCacheStatus(String cacheName, ClusterCacheStatus cacheStatus) throws Exception {
+ if (cacheStatus.getCacheTopology().getPendingCH() != null) {
+ // block the rebalance until the test reaches the desired spot
+ try {
+ rebalanceGate.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.updateCacheStatus(cacheName, cacheStatus);
+ }
+ }, true);
}
public void testRemove() throws Exception {
@@ -160,7 +154,7 @@ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) thro
});
// do not allow coordinator to send topology updates to node B
- transportGate.close();
+ rebalanceGate.close();
log.info("Adding a new node ..");
addClusterEnabledCacheManager(cacheConfigBuilder);
@@ -199,7 +193,7 @@ public Object call() throws Exception {
assertTrue(cache(1).keySet().isEmpty());
// allow rebalance to start
- transportGate.open();
+ rebalanceGate.open();
// wait for state transfer to end
TestingUtil.waitForRehashToComplete(cache(0), cache(1));
@@ -241,7 +235,7 @@ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) thro
});
// do not allow coordinator to send topology updates to node B
- transportGate.close();
+ rebalanceGate.close();
log.info("Adding a new node ..");
addClusterEnabledCacheManager(cacheConfigBuilder);
@@ -280,7 +274,7 @@ public Object call() throws Exception {
assertTrue(cache(1).keySet().isEmpty());
// allow rebalance to start
- transportGate.open();
+ rebalanceGate.open();
// wait for state transfer to end
TestingUtil.waitForRehashToComplete(cache(0), cache(1));
@@ -322,7 +316,7 @@ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) thro
});
// do not allow coordinator to send topology updates to node B
- transportGate.close();
+ rebalanceGate.close();
log.info("Adding a new node ..");
addClusterEnabledCacheManager(cacheConfigBuilder);
@@ -361,7 +355,7 @@ public Object call() throws Exception {
assertTrue(cache(1).keySet().isEmpty());
// allow rebalance to start
- transportGate.open();
+ rebalanceGate.open();
// wait for state transfer to end
TestingUtil.waitForRehashToComplete(cache(0), cache(1));

0 comments on commit edfa348

Please sign in to comment.