From 3ca6fcc20fd6fdca82cb86737c65480fb44c2153 Mon Sep 17 00:00:00 2001 From: Pedro Ruivo Date: Thu, 7 Dec 2017 19:21:52 +0000 Subject: [PATCH] ISPN-8599 DistSyncNonTxStateTransferTest.testStateTransferWithClusterIdle random failures * refactoring and cleaning tests under xsite.statetransfer package --- .../infinispan/xsite/AbstractXSiteTest.java | 4 +- .../AbstractStateTransferTest.java | 98 ++++ .../BackupForStateTransferTest.java | 76 +-- .../statetransfer/BaseStateTransferTest.java | 464 ++++++------------ .../DistSyncNonTxStateTransferTest.java | 6 +- .../DistSyncOnePhaseTxStateTransferTest.java | 6 +- .../DistSyncTwoPhasesTxStateTransferTest.java | 25 +- .../statetransfer/XSiteProviderDelegator.java | 2 +- .../failures/AbstractTopologyChangeTest.java | 197 +++----- .../failures/RetryMechanismTest.java | 179 +++---- .../SiteConsumerTopologyChangeTest.java | 7 +- .../StateTransferLinkFailuresTest.java | 143 ++---- 12 files changed, 453 insertions(+), 754 deletions(-) create mode 100644 core/src/test/java/org/infinispan/xsite/statetransfer/AbstractStateTransferTest.java diff --git a/core/src/test/java/org/infinispan/xsite/AbstractXSiteTest.java b/core/src/test/java/org/infinispan/xsite/AbstractXSiteTest.java index 2f9276c537db..d6d086882236 100644 --- a/core/src/test/java/org/infinispan/xsite/AbstractXSiteTest.java +++ b/core/src/test/java/org/infinispan/xsite/AbstractXSiteTest.java @@ -312,7 +312,9 @@ public AdvancedCache advancedCache(int index) { } public Cache cache(String cacheName, int index) { - return cacheManagers.get(index).getCache(cacheName); + return cacheName == null ? + cache(index) : + cacheManagers.get(index).getCache(cacheName); } public List cacheManagers() { diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/AbstractStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/AbstractStateTransferTest.java new file mode 100644 index 000000000000..59cc337dfb7b --- /dev/null +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/AbstractStateTransferTest.java @@ -0,0 +1,98 @@ +package org.infinispan.xsite.statetransfer; + +import static org.infinispan.test.TestingUtil.extractComponent; +import static org.infinispan.xsite.XSiteAdminOperations.OFFLINE; +import static org.infinispan.xsite.XSiteAdminOperations.ONLINE; +import static org.infinispan.xsite.XSiteAdminOperations.SUCCESS; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.infinispan.Cache; +import org.infinispan.context.Flag; +import org.infinispan.statetransfer.CommitManager; +import org.infinispan.xsite.AbstractTwoSitesTest; +import org.infinispan.xsite.XSiteAdminOperations; + +/** + * Base class for state transfer tests with some utility method. + * + * @author Pedro Ruivo + * @since 9.2 + */ +public abstract class AbstractStateTransferTest extends AbstractTwoSitesTest { + + void assertNoStateTransferInReceivingSite(String cacheName) { + assertInSite(NYC, cacheName, this::assertNotReceivingStateForCache); + } + + void assertNoStateTransferInSendingSite() { + assertInSite(LON, cache -> assertTrue(isNotSendingStateForCache(cache))); + } + + void assertEventuallyNoStateTransferInSendingSite() { + assertEventuallyInSite(LON, this::isNotSendingStateForCache, 30, TimeUnit.SECONDS); + } + + void assertEventuallyStateTransferNotRunning() { + eventually(() -> adminOperations().getRunningStateTransfer().isEmpty(), 30, + TimeUnit.SECONDS); + } + + int chunkSize() { + return cache(LON, 0).getCacheConfiguration().sites().allBackups().get(0).stateTransfer().chunkSize(); + } + + protected void assertEventuallyNoStateTransferInReceivingSite(String cacheName) { + assertEventuallyInSite(NYC, cacheName, this::isNotReceivingStateForCache, 30, TimeUnit.SECONDS); + } + + protected void startStateTransfer() { + startStateTransfer(cache(LON, 0), NYC); + } + + protected void startStateTransfer(Cache cache, String toSite) { + XSiteAdminOperations operations = extractComponent(cache, XSiteAdminOperations.class); + assertEquals(SUCCESS, operations.pushState(toSite)); + } + + protected void takeSiteOffline() { + XSiteAdminOperations operations = extractComponent(cache(LON, 0), XSiteAdminOperations.class); + assertEquals(SUCCESS, operations.takeSiteOffline(NYC)); + } + + protected void assertOffline() { + XSiteAdminOperations operations = extractComponent(cache(LON, 0), XSiteAdminOperations.class); + assertEquals(OFFLINE, operations.siteStatus(NYC)); + } + + protected void assertOnline(String localSite, String remoteSite) { + XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); + assertEquals(ONLINE, operations.siteStatus(remoteSite)); + } + + protected XSiteAdminOperations adminOperations() { + return extractComponent(cache(LON, 0), XSiteAdminOperations.class); + } + + private void assertNotReceivingStateForCache(Cache cache) { + CommitManager commitManager = extractComponent(cache, CommitManager.class); + assertFalse(commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER)); + assertFalse(commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER)); + assertTrue(commitManager.isEmpty()); + } + + private boolean isNotReceivingStateForCache(Cache cache) { + CommitManager commitManager = extractComponent(cache, CommitManager.class); + return !commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER) && + !commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER) && + commitManager.isEmpty(); + } + + private boolean isNotSendingStateForCache(Cache cache) { + return extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty(); + } + +} diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/BackupForStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/BackupForStateTransferTest.java index ab57dfe615bb..4c09f3571699 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/BackupForStateTransferTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/BackupForStateTransferTest.java @@ -1,18 +1,14 @@ package org.infinispan.xsite.statetransfer; -import static org.infinispan.test.TestingUtil.extractComponent; +import static org.infinispan.test.TestingUtil.k; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; -import java.util.concurrent.TimeUnit; +import java.lang.reflect.Method; import org.infinispan.configuration.cache.BackupConfigurationBuilder; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.context.Flag; -import org.infinispan.statetransfer.CommitManager; -import org.infinispan.xsite.AbstractTwoSitesTest; -import org.infinispan.xsite.XSiteAdminOperations; import org.testng.annotations.Test; /** @@ -22,7 +18,7 @@ * @since 7.0 */ @Test(groups = "xsite", testName = "xsite.statetransfer.BackupForStateTransferTest") -public class BackupForStateTransferTest extends AbstractTwoSitesTest { +public class BackupForStateTransferTest extends AbstractStateTransferTest { private static final String VALUE = "value"; private static final String LON_BACKUP_CACHE_NAME = "lonBackup"; @@ -32,18 +28,18 @@ public BackupForStateTransferTest() { this.implicitBackupCache = false; } - public void testStateTransferWithClusterIdle() throws InterruptedException { - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); - assertNoStateTransferInReceivingSite(NYC, LON_BACKUP_CACHE_NAME); - assertNoStateTransferInSendingSite(LON); + public void testStateTransferWithClusterIdle(Method method) { + takeSiteOffline(); + assertOffline(); + assertNoStateTransferInReceivingSite(LON_BACKUP_CACHE_NAME); + assertNoStateTransferInSendingSite(); //NYC is offline... lets put some initial data in //we have 2 nodes in each site and the primary owner sends the state. Lets try to have more key than the chunk //size in order to each site to send more than one chunk. - final int amountOfData = chunkSize(LON) * 4; + final int amountOfData = chunkSize() * 4; for (int i = 0; i < amountOfData; ++i) { - cache(LON, 0).put(key(i), VALUE); + cache(LON, 0).put(k(method, i), VALUE); } //check if NYC is empty (LON backup cache) @@ -52,25 +48,24 @@ public void testStateTransferWithClusterIdle() throws InterruptedException { //check if NYC is empty (default cache) assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); - startStateTransfer(LON, NYC); + startStateTransfer(); - eventually(() -> extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(), - TimeUnit.SECONDS.toMillis(30)); + assertEventuallyStateTransferNotRunning(); assertOnline(LON, NYC); //check if all data is visible (LON backup cache) assertInSite(NYC, LON_BACKUP_CACHE_NAME, cache -> { for (int i = 0; i < amountOfData; ++i) { - assertEquals(VALUE, cache.get(key(i))); + assertEquals(VALUE, cache.get(k(method, i))); } }); //check if NYC is empty (default cache) assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); - assertNoStateTransferInReceivingSite(NYC, LON_BACKUP_CACHE_NAME); - assertNoStateTransferInSendingSite(LON); + assertEventuallyNoStateTransferInReceivingSite(LON_BACKUP_CACHE_NAME); + assertNoStateTransferInSendingSite(); } @Override @@ -88,45 +83,4 @@ protected void adaptLONConfiguration(BackupConfigurationBuilder builder) { builder.site(NYC).stateTransfer().chunkSize(10); } - private void startStateTransfer(String fromSite, String toSite) { - XSiteAdminOperations operations = extractComponent(cache(fromSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.SUCCESS, operations.pushState(toSite)); - } - - private void takeSiteOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.SUCCESS, operations.takeSiteOffline(remoteSite)); - } - - private void assertOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.OFFLINE, operations.siteStatus(remoteSite)); - } - - private void assertOnline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.ONLINE, operations.siteStatus(remoteSite)); - } - - private int chunkSize(String site) { - return cache(site, 0).getCacheConfiguration().sites().allBackups().get(0).stateTransfer().chunkSize(); - } - - private void assertNoStateTransferInReceivingSite(String siteName, String cacheName) { - assertEventuallyInSite(siteName, cacheName, cache -> { - CommitManager commitManager = extractComponent(cache, CommitManager.class); - return !commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER) && - !commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER) && - commitManager.isEmpty(); - }, 30, TimeUnit.SECONDS); - } - - private void assertNoStateTransferInSendingSite(String siteName) { - assertInSite(siteName, - cache -> assertTrue(extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty())); - } - - private Object key(int index) { - return "key-" + index; - } } diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java index 9e037f2901cd..0a10b76feca4 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java @@ -2,13 +2,18 @@ import static org.infinispan.test.TestingUtil.extractComponent; import static org.infinispan.test.TestingUtil.extractGlobalComponent; +import static org.infinispan.test.TestingUtil.k; import static org.infinispan.test.TestingUtil.replaceComponent; +import static org.infinispan.xsite.XSiteAdminOperations.SUCCESS; +import static org.infinispan.xsite.statetransfer.XSiteStateTransferManager.STATUS_CANCELED; +import static org.infinispan.xsite.statetransfer.XSiteStateTransferManager.STATUS_SENDING; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -28,16 +33,14 @@ import org.infinispan.commands.write.RemoveCommand; import org.infinispan.commands.write.WriteCommand; import org.infinispan.configuration.cache.BackupConfigurationBuilder; +import org.infinispan.configuration.cache.CacheMode; import org.infinispan.container.entries.InternalCacheValue; -import org.infinispan.context.Flag; -import org.infinispan.distribution.DistributionManager; +import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.manager.CacheContainer; import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.ControlledTransport; import org.infinispan.remoting.transport.Transport; -import org.infinispan.statetransfer.CommitManager; import org.infinispan.test.fwk.CheckPoint; -import org.infinispan.xsite.AbstractTwoSitesTest; import org.infinispan.xsite.BackupReceiver; import org.infinispan.xsite.BackupReceiverDelegator; import org.infinispan.xsite.BackupReceiverRepository; @@ -51,41 +54,41 @@ * @author Pedro Ruivo * @since 7.0 */ -public abstract class BaseStateTransferTest extends AbstractTwoSitesTest { +public abstract class BaseStateTransferTest extends AbstractStateTransferTest { - protected static final String LON = "LON"; - protected static final String NYC = "NYC"; + private static final String VALUE = "value"; public BaseStateTransferTest() { this.cleanup = CleanupPhase.AFTER_METHOD; + this.cacheMode = CacheMode.DIST_SYNC; } @Test(groups = "xsite") public void testStateTransferNonExistingSite() { - XSiteAdminOperations operations = extractComponent(cache(LON, 0), XSiteAdminOperations.class); + XSiteAdminOperations operations = adminOperations(); assertEquals("Unable to pushState to 'NO_SITE'. Incorrect site name: NO_SITE", operations.pushState("NO_SITE")); assertTrue(operations.getRunningStateTransfer().isEmpty()); - assertNoStateTransferInSendingSite(LON); + assertNoStateTransferInSendingSite(); } @Test(groups = "xsite") - public void testCancelStateTransfer() throws InterruptedException { - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + public void testCancelStateTransfer(Method method) throws InterruptedException { + takeSiteOffline(); + assertOffline(); + assertNoStateTransferInReceivingSite(null); + assertNoStateTransferInSendingSite(); // NYC is offline... lets put some initial data in LON. // The primary owner is the one sending the state to the backup. // We add keys until we have more than one chunk on the LON coordinator. - DistributionManager dm0 = cache(LON, 0).getAdvancedCache().getDistributionManager(); + LocalizedCacheTopology topology = cache(LON, 0).getAdvancedCache().getDistributionManager().getCacheTopology(); Address coordLON = cache(LON, 0).getCacheManager().getAddress(); Set keysOnCoordinator = new HashSet<>(); int i = 0; - while (keysOnCoordinator.size() < chunkSize(LON)) { - Object key = key(i); - cache(LON, 0).put(key, value(0)); - if (dm0.getPrimaryLocation(key).equals(coordLON)) { + while (keysOnCoordinator.size() < chunkSize()) { + Object key = k(method, i); + cache(LON, 0).put(key, VALUE); + if (topology.getDistribution(key).primary().equals(coordLON)) { keysOnCoordinator.add(key); } ++i; @@ -94,205 +97,174 @@ public void testCancelStateTransfer() throws InterruptedException { log.debugf("Coordinator %s is primary owner for %d keys: %s", coordLON, keysOnCoordinator.size(), keysOnCoordinator); //check if NYC is empty - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertTrue(cache.isEmpty()); - } - }); + assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); ControlledTransport controllerTransport = replaceTransport(cache(LON, 0)); controllerTransport.blockBefore(XSiteStatePushCommand.class); - startStateTransfer(LON, NYC); + startStateTransfer(); controllerTransport.waitForCommandToBlock(); - assertEquals(XSiteAdminOperations.SUCCESS, extractComponent(cache(LON, 0), XSiteAdminOperations.class).cancelPushState(NYC)); + assertEquals(SUCCESS, adminOperations().cancelPushState(NYC)); controllerTransport.stopBlocking(); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(); - } - }, TimeUnit.SECONDS.toMillis(30)); + assertEventuallyStateTransferNotRunning(); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + assertEventuallyNoStateTransferInReceivingSite(null); + assertEventuallyNoStateTransferInSendingSite(); - assertEquals(XSiteStateTransferManager.STATUS_CANCELED, extractComponent(cache(LON, 0), XSiteAdminOperations.class).getPushStateStatus().get(NYC)); + assertEquals(STATUS_CANCELED, adminOperations().getPushStateStatus().get(NYC)); controllerTransport.blockBefore(XSiteStatePushCommand.class); - startStateTransfer(LON, NYC); + startStateTransfer(); controllerTransport.waitForCommandToBlock(); - assertEquals(XSiteStateTransferManager.STATUS_SENDING, extractComponent(cache(LON, 0), XSiteAdminOperations.class).getPushStateStatus().get(NYC)); + assertEquals(STATUS_SENDING, adminOperations().getPushStateStatus().get(NYC)); controllerTransport.stopBlocking(); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(); - } - }, TimeUnit.SECONDS.toMillis(30)); + assertEventuallyStateTransferNotRunning(); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + assertEventuallyNoStateTransferInReceivingSite(null); + assertEventuallyNoStateTransferInSendingSite(); //check if all data is visible - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - for (int i = 0; i < numKeys; ++i) { - assertEquals(value(0), cache.get(key(i))); - } + assertInSite(NYC, cache -> { + for (int i1 = 0; i1 < numKeys; ++i1) { + assertEquals(VALUE, cache.get(k(method, i1))); } }); } @Test(groups = "xsite") - public void testStateTransferWithClusterIdle() throws InterruptedException { - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + public void testStateTransferWithClusterIdle(Method method) { + takeSiteOffline(); + assertOffline(); + assertNoStateTransferInReceivingSite(null); + assertNoStateTransferInSendingSite(); //NYC is offline... lets put some initial data in //we have 2 nodes in each site and the primary owner sends the state. Lets try to have more key than the chunk //size in order to each site to send more than one chunk. - final int amountOfData = chunkSize(LON) * 4; + final int amountOfData = chunkSize() * 4; for (int i = 0; i < amountOfData; ++i) { - cache(LON, 0).put(key(i), value(0)); + cache(LON, 0).put(k(method, i), VALUE); } //check if NYC is empty - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertTrue(cache.isEmpty()); - } - }); + assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); - startStateTransfer(LON, NYC); + startStateTransfer(); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(); - } - }, TimeUnit.SECONDS.toMillis(30)); + assertEventuallyStateTransferNotRunning(); assertOnline(LON, NYC); //check if all data is visible - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - for (int i = 0; i < amountOfData; ++i) { - assertEquals(value(0), cache.get(key(i))); - } + assertInSite(NYC, cache -> { + for (int i = 0; i < amountOfData; ++i) { + assertEquals(VALUE, cache.get(k(method, i))); } }); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + assertEventuallyNoStateTransferInReceivingSite(null); + assertEventuallyNoStateTransferInSendingSite(); } @Test(groups = "xsite") - public void testPutOperationBeforeState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.PUT, true); + public void testPutOperationBeforeState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.PUT, true, method); } @Test(groups = "xsite") - public void testPutOperationAfterState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.PUT, false); + public void testPutOperationAfterState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.PUT, false, method); } @Test(groups = "xsite") - public void testRemoveOperationBeforeState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REMOVE, true); + public void testRemoveOperationBeforeState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REMOVE, true, method); } @Test(groups = "xsite") - public void testRemoveOperationAfterState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REMOVE, false); + public void testRemoveOperationAfterState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REMOVE, false, method); } @Test(groups = "xsite") - public void testRemoveIfMatchOperationBeforeState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REMOVE_IF_MATCH, true); + public void testRemoveIfMatchOperationBeforeState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REMOVE_IF_MATCH, true, method); } @Test(groups = "xsite") - public void testRemoveIfMatchOperationAfterState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REMOVE_IF_MATCH, false); + public void testRemoveIfMatchOperationAfterState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REMOVE_IF_MATCH, false, method); } @Test(groups = "xsite") - public void testReplaceOperationBeforeState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REPLACE, true); + public void testReplaceOperationBeforeState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REPLACE, true, method); } @Test(groups = "xsite") - public void testReplaceOperationAfterState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REPLACE, false); + public void testReplaceOperationAfterState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REPLACE, false, method); } @Test(groups = "xsite") - public void testReplaceIfMatchOperationBeforeState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REPLACE_IF_MATCH, true); + public void testReplaceIfMatchOperationBeforeState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REPLACE_IF_MATCH, true, method); } @Test(groups = "xsite") - public void testReplaceIfMatchOperationAfterState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.REPLACE_IF_MATCH, false); + public void testReplaceIfMatchOperationAfterState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.REPLACE_IF_MATCH, false, method); } @Test(groups = "xsite") - public void testClearOperationBeforeState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.CLEAR, true); + public void testClearOperationBeforeState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.CLEAR, true, method); } @Test(groups = "xsite") - public void testClearOperationAfterState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.CLEAR, false); + public void testClearOperationAfterState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.CLEAR, false, method); } @Test(groups = "xsite") - public void testPutMapOperationBeforeState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.PUT_MAP, true); + public void testPutMapOperationBeforeState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.PUT_MAP, true, method); } @Test(groups = "xsite") - public void testPutMapOperationAfterState() throws Exception { - testStateTransferWithConcurrentOperation(Operation.PUT_MAP, false); + public void testPutMapOperationAfterState(Method method) throws Exception { + testStateTransferWithConcurrentOperation(Operation.PUT_MAP, false, method); } @Test(groups = "xsite") - public void testPutIfAbsentFail() throws Exception { - testStateTransferWithNoReplicatedOperation(Operation.PUT_IF_ABSENT_FAIL); + public void testPutIfAbsentFail(Method method) throws Exception { + testStateTransferWithNoReplicatedOperation(Operation.PUT_IF_ABSENT_FAIL, method); } @Test(groups = "xsite") - public void testRemoveIfMatchFail() throws Exception { - testStateTransferWithNoReplicatedOperation(Operation.REMOVE_IF_MATCH_FAIL); + public void testRemoveIfMatchFail(Method method) throws Exception { + testStateTransferWithNoReplicatedOperation(Operation.REMOVE_IF_MATCH_FAIL, method); } @Test(groups = "xsite") - public void testReplaceIfMatchFail() throws Exception { - testStateTransferWithNoReplicatedOperation(Operation.REPLACE_IF_MATCH_FAIL); + public void testReplaceIfMatchFail(Method method) throws Exception { + testStateTransferWithNoReplicatedOperation(Operation.REPLACE_IF_MATCH_FAIL, method); } @Test(groups = "xsite") - public void testPutIfAbsent() throws Exception { - testConcurrentOperation(Operation.PUT_IF_ABSENT); + public void testPutIfAbsent(Method method) throws Exception { + testConcurrentOperation(Operation.PUT_IF_ABSENT, method); } @Test(groups = "xsite") - public void testRemoveNonExisting() throws Exception { - testConcurrentOperation(Operation.REMOVE_NON_EXISTING); + public void testRemoveNonExisting(Method method) throws Exception { + testConcurrentOperation(Operation.REMOVE_NON_EXISTING, method); } @Override @@ -300,16 +272,16 @@ protected void adaptLONConfiguration(BackupConfigurationBuilder builder) { builder.stateTransfer().chunkSize(2).timeout(2000); } - private void testStateTransferWithConcurrentOperation(final Operation operation, final boolean performBeforeState) - throws Exception { + private void testStateTransferWithConcurrentOperation(final Operation operation, final boolean performBeforeState, + final Method method) throws Exception { assertNotNull(operation); assertTrue(operation.replicates()); - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + takeSiteOffline(); + assertOffline(); + assertNoStateTransferInReceivingSite(null); + assertNoStateTransferInSendingSite(); - final Object key = key(0); + final Object key = k(method, 0); final CheckPoint checkPoint = new CheckPoint(); operation.init(cache(LON, 0), key); @@ -326,7 +298,7 @@ public void beforeCommand(VisitableCommand command) throws Exception { } @Override - public void afterCommand(VisitableCommand command) throws Exception { + public void afterCommand(VisitableCommand command) { if (performBeforeState && isUpdatingKeyWithValue(command, key, operation.finalValue())) { //command was performed before state... let the state continue checkPoint.trigger("apply-state"); @@ -346,7 +318,7 @@ public void beforeState(XSiteStatePushCommand command) throws Exception { } @Override - public void afterState(XSiteStatePushCommand command) throws Exception { + public void afterState(XSiteStatePushCommand command) { if (!performBeforeState && containsKey(command.getChunk(), key)) { //state before command... let the command go... checkPoint.trigger("update-key"); @@ -359,7 +331,7 @@ public void afterState(XSiteStatePushCommand command) throws Exception { } //safe (i.e. not blocking main thread), the state transfer is async - startStateTransfer(LON, NYC); + startStateTransfer(); assertOnline(LON, NYC); //state transfer should send old value @@ -369,40 +341,25 @@ public void afterState(XSiteStatePushCommand command) throws Exception { //safe, perform is async operation.perform(cache(LON, 0), key).get(); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(); - } - }, TimeUnit.SECONDS.toMillis(30)); + assertEventuallyStateTransferNotRunning(); - assertEventuallyNoStateTransferInReceivingSite(NYC, 30, TimeUnit.SECONDS); - assertEventuallyNoStateTransferInSendingSite(LON, 30, TimeUnit.SECONDS); + assertEventuallyNoStateTransferInReceivingSite(null); + assertEventuallyNoStateTransferInSendingSite(); //check if all data is visible - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertEquals(operation.finalValue(), cache.get(key)); - } - }); - assertInSite(LON, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertEquals(operation.finalValue(), cache.get(key)); - } - }); + assertInSite(NYC, cache -> assertEquals(operation.finalValue(), cache.get(key))); + assertInSite(LON, cache -> assertEquals(operation.finalValue(), cache.get(key))); } - private void testConcurrentOperation(final Operation operation) throws Exception { + private void testConcurrentOperation(final Operation operation, final Method method) throws Exception { assertNotNull(operation); assertTrue(operation.replicates()); - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + takeSiteOffline(); + assertOffline(); + assertNoStateTransferInReceivingSite(null); + assertNoStateTransferInSendingSite(); - final Object key = key(0); + final Object key = k(method, 0); operation.init(cache(LON, 0), key); assertNull(operation.initialValue()); @@ -410,12 +367,7 @@ private void testConcurrentOperation(final Operation operation) throws Exception final XSiteStateProviderControl control = XSiteStateProviderControl.replaceInCache(cache(LON, 0)); //safe (i.e. not blocking main thread), the state transfer is async - final Future f = fork(new Runnable() { - @Override - public void run() { - startStateTransfer(LON, NYC); - } - }); + final Future f = fork((Runnable) this::startStateTransfer); //state transfer will be running (nothing to transfer however) while the operation is done. control.await(); @@ -427,40 +379,26 @@ public void run() { control.trigger(); f.get(30, TimeUnit.SECONDS); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(); - } - }, TimeUnit.SECONDS.toMillis(30)); + assertEventuallyStateTransferNotRunning(); - assertEventuallyNoStateTransferInReceivingSite(NYC, 30, TimeUnit.SECONDS); - assertEventuallyNoStateTransferInSendingSite(LON, 30, TimeUnit.SECONDS); + assertEventuallyNoStateTransferInReceivingSite(null); + assertEventuallyNoStateTransferInSendingSite(); //check if all data is visible - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertEquals(operation.finalValue(), cache.get(key)); - } - }); - assertInSite(LON, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertEquals(operation.finalValue(), cache.get(key)); - } - }); + assertInSite(NYC, cache -> assertEquals(operation.finalValue(), cache.get(key))); + assertInSite(LON, cache -> assertEquals(operation.finalValue(), cache.get(key))); } - private void testStateTransferWithNoReplicatedOperation(final Operation operation) throws Exception { + private void testStateTransferWithNoReplicatedOperation(final Operation operation, final Method method) + throws Exception { assertNotNull(operation); assertFalse(operation.replicates()); - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); - assertNoStateTransferInReceivingSite(NYC); - assertNoStateTransferInSendingSite(LON); + takeSiteOffline(); + assertOffline(); + assertNoStateTransferInReceivingSite(null); + assertNoStateTransferInSendingSite(); - final Object key = key(0); + final Object key = k(method, 0); final CheckPoint checkPoint = new CheckPoint(); final AtomicBoolean commandReceived = new AtomicBoolean(false); @@ -469,12 +407,12 @@ private void testStateTransferWithNoReplicatedOperation(final Operation operatio final BackupListener listener = new BackupListener() { @Override - public void beforeCommand(VisitableCommand command) throws Exception { + public void beforeCommand(VisitableCommand command) { commandReceived.set(true); } @Override - public void afterCommand(VisitableCommand command) throws Exception { + public void afterCommand(VisitableCommand command) { commandReceived.set(true); } @@ -490,7 +428,7 @@ public void beforeState(XSiteStatePushCommand command) throws Exception { } //safe (i.e. not blocking main thread), the state transfer is async - startStateTransfer(LON, NYC); + startStateTransfer(); assertOnline(LON, NYC); //state transfer should send old value @@ -502,29 +440,14 @@ public void beforeState(XSiteStatePushCommand command) throws Exception { assertFalse(commandReceived.get()); checkPoint.trigger("before-update"); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(); - } - }, TimeUnit.SECONDS.toMillis(30)); + assertEventuallyStateTransferNotRunning(); - assertEventuallyNoStateTransferInReceivingSite(NYC, 30, TimeUnit.SECONDS); - assertEventuallyNoStateTransferInSendingSite(LON, 30, TimeUnit.SECONDS); + assertEventuallyNoStateTransferInReceivingSite(null); + assertEventuallyNoStateTransferInSendingSite(); //check if all data is visible - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertEquals(operation.finalValue(), cache.get(key)); - } - }); - assertInSite(LON, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertEquals(operation.finalValue(), cache.get(key)); - } - }); + assertInSite(NYC, cache -> assertEquals(operation.finalValue(), cache.get(key))); + assertInSite(LON, cache -> assertEquals(operation.finalValue(), cache.get(key))); } private boolean isUpdatingKeyWithValue(VisitableCommand command, Object key, Object value) { @@ -560,80 +483,6 @@ private boolean containsKey(XSiteState[] states, Object key) { return false; } - private void startStateTransfer(String fromSite, String toSite) { - XSiteAdminOperations operations = extractComponent(cache(fromSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.SUCCESS, operations.pushState(toSite)); - } - - private void takeSiteOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.SUCCESS, operations.takeSiteOffline(remoteSite)); - } - - private void assertOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.OFFLINE, operations.siteStatus(remoteSite)); - } - - private void assertOnline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.ONLINE, operations.siteStatus(remoteSite)); - } - - private int chunkSize(String site) { - return cache(site, 0).getCacheConfiguration().sites().allBackups().get(0).stateTransfer().chunkSize(); - } - - private Object key(int index) { - return "key-" + index; - } - - private Object value(int index) { - return "value-" + index; - } - - private void assertNoStateTransferInReceivingSite(String siteName) { - assertInSite(siteName, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - CommitManager commitManager = extractComponent(cache, CommitManager.class); - assertFalse(commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER)); - assertFalse(commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER)); - assertTrue(commitManager.isEmpty()); - } - }); - } - - private void assertEventuallyNoStateTransferInReceivingSite(String siteName, long timeout, TimeUnit unit) { - assertEventuallyInSite(siteName, new EventuallyAssertCondition() { - @Override - public boolean assertInCache(Cache cache) { - CommitManager commitManager = extractComponent(cache, CommitManager.class); - return !commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER) && - !commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER) && - commitManager.isEmpty(); - } - }, timeout, unit); - } - - private void assertNoStateTransferInSendingSite(String siteName) { - assertInSite(siteName, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertTrue(extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty()); - } - }); - } - - private void assertEventuallyNoStateTransferInSendingSite(String siteName, long timeout, TimeUnit unit) { - assertEventuallyInSite(siteName, new EventuallyAssertCondition() { - @Override - public boolean assertInCache(Cache cache) { - return extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty(); - } - }, timeout, unit); - } - private ControlledTransport replaceTransport(Cache cache) { Transport current = extractGlobalComponent(cache.getCacheManager(), Transport.class); ControlledTransport controlled = new ControlledTransport(current); @@ -641,7 +490,7 @@ private ControlledTransport replaceTransport(Cache cache) { return controlled; } - private static enum Operation { + private enum Operation { PUT("v0", "v1") { @Override public void init(Cache cache, K key) { @@ -866,19 +715,19 @@ public boolean replicates() { this.finalValue = finalValue; } - public abstract void init(Cache cache, K key); - - public abstract Future perform(Cache cache, K key); - - public abstract boolean replicates(); - - public final Object initialValue() { + final Object initialValue() { return initialValue; } - public final Object finalValue() { + final Object finalValue() { return finalValue; } + + protected abstract void init(Cache cache, K key); + + protected abstract Future perform(Cache cache, K key); + + protected abstract boolean replicates(); } private static class XSiteStateProviderControl extends XSiteProviderDelegator { @@ -904,27 +753,27 @@ public void startStateTransfer(String siteName, Address requestor, int minTopolo super.startStateTransfer(siteName, requestor, minTopologyId); } - public final void await() throws TimeoutException, InterruptedException { - checkPoint.awaitStrict("before-start", 30, TimeUnit.SECONDS); - } - - public final void trigger() { - checkPoint.trigger("await-start"); - } - - public static XSiteStateProviderControl replaceInCache(Cache cache) { + static XSiteStateProviderControl replaceInCache(Cache cache) { XSiteStateProvider current = extractComponent(cache, XSiteStateProvider.class); XSiteStateProviderControl control = new XSiteStateProviderControl(current); replaceComponent(cache, XSiteStateProvider.class, control, true); return control; } + + final void await() throws TimeoutException, InterruptedException { + checkPoint.awaitStrict("before-start", 30, TimeUnit.SECONDS); + } + + final void trigger() { + checkPoint.trigger("await-start"); + } } private static class BackupReceiverRepositoryWrapper extends BackupReceiverRepositoryDelegator { private final BackupListener listener; - public BackupReceiverRepositoryWrapper(BackupReceiverRepository delegate, BackupListener listener) { + BackupReceiverRepositoryWrapper(BackupReceiverRepository delegate, BackupListener listener) { super(delegate); if (listener == null) { throw new NullPointerException("Listener must not be null."); @@ -957,29 +806,28 @@ public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception }; } - public static BackupReceiverRepositoryWrapper replaceInCache(CacheContainer cacheContainer, BackupListener listener) { + static void replaceInCache(CacheContainer cacheContainer, BackupListener listener) { BackupReceiverRepository delegate = extractGlobalComponent(cacheContainer, BackupReceiverRepository.class); BackupReceiverRepositoryWrapper wrapper = new BackupReceiverRepositoryWrapper(delegate, listener); replaceComponent(cacheContainer, BackupReceiverRepository.class, wrapper, true); - return wrapper; } } private static abstract class BackupListener { - public void beforeCommand(VisitableCommand command) throws Exception { + void beforeCommand(VisitableCommand command) throws Exception { //no-op by default } - public void afterCommand(VisitableCommand command) throws Exception { + void afterCommand(VisitableCommand command) { //no-op by default } - public void beforeState(XSiteStatePushCommand command) throws Exception { + void beforeState(XSiteStatePushCommand command) throws Exception { //no-op by default } - public void afterState(XSiteStatePushCommand command) throws Exception { + void afterState(XSiteStatePushCommand command) { //no-op by default } diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncNonTxStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncNonTxStateTransferTest.java index 58fff4104d8d..2f8839883e47 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncNonTxStateTransferTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncNonTxStateTransferTest.java @@ -1,6 +1,5 @@ package org.infinispan.xsite.statetransfer; -import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.testng.annotations.Test; @@ -17,15 +16,16 @@ public class DistSyncNonTxStateTransferTest extends BaseStateTransferTest { public DistSyncNonTxStateTransferTest() { super(); implicitBackupCache = true; + transactional = false; } @Override protected ConfigurationBuilder getNycActiveConfig() { - return getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false); + return getDefaultClusteredCacheConfig(cacheMode, transactional); } @Override protected ConfigurationBuilder getLonActiveConfig() { - return getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false); + return getDefaultClusteredCacheConfig(cacheMode, transactional); } } diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncOnePhaseTxStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncOnePhaseTxStateTransferTest.java index 8d6bf98c0967..a680c296bc45 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncOnePhaseTxStateTransferTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncOnePhaseTxStateTransferTest.java @@ -1,6 +1,5 @@ package org.infinispan.xsite.statetransfer; -import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.testng.annotations.Test; @@ -18,15 +17,16 @@ public DistSyncOnePhaseTxStateTransferTest() { super(); use2Pc = false; implicitBackupCache = true; + transactional = true; } @Override protected ConfigurationBuilder getNycActiveConfig() { - return getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true); + return getDefaultClusteredCacheConfig(cacheMode, transactional); } @Override protected ConfigurationBuilder getLonActiveConfig() { - return getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true); + return getDefaultClusteredCacheConfig(cacheMode, transactional); } } diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncTwoPhasesTxStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncTwoPhasesTxStateTransferTest.java index 95ea56c1916e..bb776052c5e1 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncTwoPhasesTxStateTransferTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/DistSyncTwoPhasesTxStateTransferTest.java @@ -1,6 +1,5 @@ package org.infinispan.xsite.statetransfer; -import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.testng.annotations.Test; @@ -18,34 +17,16 @@ public DistSyncTwoPhasesTxStateTransferTest() { super(); use2Pc = true; implicitBackupCache = true; + transactional = true; } @Override protected ConfigurationBuilder getNycActiveConfig() { - return getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true); + return getDefaultClusteredCacheConfig(cacheMode, transactional); } @Override protected ConfigurationBuilder getLonActiveConfig() { - return getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true); - } - - @Test(enabled = false) - @Override - public void testClearOperationBeforeState() throws Exception { - /* - It does not work with the current clear implementation. this is what is happening: - 0) assume cache is not empty - 1) state transfer and clear happens at the same time - 2) clear (PrepareCommand) arrives first than the state - 2.1) clear is replayed. the behavior is to iterate over all keys, lock, wrap, and mark them as removed. - however, there is no key in the cache - 3) the state arrives. it acquires locks and applies the keys (note that 2.1) didn't acquire any lock) - 4) CommitCommand arrives. For all entries wrapped, it is removed. However, nothing was wrapped so nothing gets - removed - - https://issues.jboss.org/browse/ISPN-4073 - */ - super.testClearOperationBeforeState(); + return getDefaultClusteredCacheConfig(cacheMode, transactional); } } diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/XSiteProviderDelegator.java b/core/src/test/java/org/infinispan/xsite/statetransfer/XSiteProviderDelegator.java index 34e439a60edb..828ad912988d 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/XSiteProviderDelegator.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/XSiteProviderDelegator.java @@ -15,7 +15,7 @@ public class XSiteProviderDelegator implements XSiteStateProvider { protected final XSiteStateProvider xSiteStateProvider; - public XSiteProviderDelegator(XSiteStateProvider xSiteStateProvider) { + protected XSiteProviderDelegator(XSiteStateProvider xSiteStateProvider) { this.xSiteStateProvider = xSiteStateProvider; } diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/AbstractTopologyChangeTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/AbstractTopologyChangeTest.java index 885f0bc15ff1..56a606a5b859 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/AbstractTopologyChangeTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/AbstractTopologyChangeTest.java @@ -2,10 +2,10 @@ import static org.infinispan.distribution.DistributionTestHelper.addressOf; import static org.infinispan.test.TestingUtil.extractComponent; +import static org.infinispan.xsite.statetransfer.XSiteStateTransferManager.STATUS_ERROR; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -13,12 +13,9 @@ import org.infinispan.configuration.cache.BackupConfigurationBuilder; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.context.Flag; -import org.infinispan.statetransfer.CommitManager; import org.infinispan.statetransfer.StateConsumer; import org.infinispan.statetransfer.StateProvider; -import org.infinispan.xsite.AbstractTwoSitesTest; -import org.infinispan.xsite.XSiteAdminOperations; +import org.infinispan.xsite.statetransfer.AbstractStateTransferTest; import org.infinispan.xsite.statetransfer.XSiteStateProvider; /** @@ -27,181 +24,129 @@ * @author Pedro Ruivo * @since 7.0 */ -public abstract class AbstractTopologyChangeTest extends AbstractTwoSitesTest { +public abstract class AbstractTopologyChangeTest extends AbstractStateTransferTest { - protected static final int NR_KEYS = 20; //10 * chunk size + private static final int NR_KEYS = 20; //10 * chunk size - protected AbstractTopologyChangeTest() { + AbstractTopologyChangeTest() { this.implicitBackupCache = true; this.cleanup = CleanupPhase.AFTER_METHOD; this.initialClusterSize = 3; } - @Override - protected void adaptLONConfiguration(BackupConfigurationBuilder builder) { - builder.stateTransfer().chunkSize(2).timeout(2000); - } - - @Override - protected ConfigurationBuilder getNycActiveConfig() { - return createConfiguration(); - } - - @Override - protected ConfigurationBuilder getLonActiveConfig() { - return createConfiguration(); - } - - protected static ConfigurationBuilder createConfiguration() { + private static ConfigurationBuilder createConfiguration() { ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false); builder.clustering().hash().numOwners(2); return builder; } - protected void awaitLocalStateTransfer(String site) { + void awaitLocalStateTransfer(String site) { log.debugf("Await until rebalance in site '%s' is finished!", site); - assertEventuallyInSite(site, new EventuallyAssertCondition() { - @Override - public boolean assertInCache(Cache cache) { - return !extractComponent(cache, StateConsumer.class).isStateTransferInProgress() && - !extractComponent(cache, StateProvider.class).isStateTransferInProgress(); - } - }, 30, TimeUnit.SECONDS); + assertEventuallyInSite(site, cache -> !extractComponent(cache, StateConsumer.class).isStateTransferInProgress() && + !extractComponent(cache, StateProvider.class).isStateTransferInProgress(), 30, TimeUnit.SECONDS); } - protected void awaitXSiteStateSent(String site) { + void awaitXSiteStateSent(String site) { log.debugf("Await until all nodes in '%s' has sent the state!", site); - assertEventuallyInSite(site, new EventuallyAssertCondition() { - @Override - public boolean assertInCache(Cache cache) { - return extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty(); - } - }, 30, TimeUnit.SECONDS); + assertEventuallyInSite(site, + cache -> extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty(), 30, + TimeUnit.SECONDS); } - protected void awaitXSiteStateReceived(String site) { - log.debugf("Await until all nodes in '%s' has received the state!", site); - assertEventuallyInSite(site, new EventuallyAssertCondition() { - @Override - public boolean assertInCache(Cache cache) { - return !extractComponent(cache, CommitManager.class).isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER); - } - }, 30, TimeUnit.SECONDS); - } - - protected Future triggerTopologyChange(final String siteName, final int removeIndex) { + Future triggerTopologyChange(final String siteName, final int removeIndex) { if (removeIndex >= 0) { - return fork(new Callable() { - @Override - public Void call() throws Exception { - log.debugf("Shutting down cache %s", addressOf(cache(siteName, removeIndex))); - site(siteName).kill(removeIndex); - log.debugf("Wait for cluster to form on caches %s", site(siteName).getCaches(null)); - site(siteName).waitForClusterToForm(null, 60, TimeUnit.SECONDS); - return null; - } + return fork(() -> { + log.debugf("Shutting down cache %s", addressOf(cache(siteName, removeIndex))); + site(siteName).kill(removeIndex); + log.debugf("Wait for cluster to form on caches %s", site(siteName).getCaches(null)); + site(siteName).waitForClusterToForm(null, 60, TimeUnit.SECONDS); + return null; }); } else { log.debug("Adding new cache"); site(siteName).addCache(globalConfigurationBuilderForSite(siteName), lonConfigurationBuilder()); - return fork(new Callable() { - @Override - public Void call() throws Exception { - log.debugf("Wait for cluster to form on caches %s", site(siteName).getCaches(null)); - site(siteName).waitForClusterToForm(null, 60, TimeUnit.SECONDS); - return null; - } + return fork(() -> { + log.debugf("Wait for cluster to form on caches %s", site(siteName).getCaches(null)); + site(siteName).waitForClusterToForm(null, 60, TimeUnit.SECONDS); + return null; }); } } - protected void initBeforeTest() { - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); + void initBeforeTest() { + takeSiteOffline(); + assertOffline(); putData(); assertDataInSite(LON); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertTrue(cache.isEmpty()); - } - }); + assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); } - protected void putData() { - for (int i = 0; i < NR_KEYS; ++i) { - cache(LON, 0).put(key(Integer.toString(i)), val(Integer.toString(i))); - } - } - - protected void assertData() { + void assertData() { assertDataInSite(LON); assertDataInSite(NYC); } - protected void startStateTransfer(Cache coordinator, String toSite) { - XSiteAdminOperations operations = extractComponent(coordinator, XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.SUCCESS, operations.pushState(toSite)); - } - - protected void takeSiteOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.SUCCESS, operations.takeSiteOffline(remoteSite)); - } - - protected void assertOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.OFFLINE, operations.siteStatus(remoteSite)); - } - - protected void assertOnline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.ONLINE, operations.siteStatus(remoteSite)); - } - - protected void assertDataInSite(String siteName) { - assertInSite(siteName, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - for (int i = 0; i < NR_KEYS; ++i) { - assertEquals(val(Integer.toString(i)), cache.get(key(Integer.toString(i)))); - } + void assertDataInSite(String siteName) { + assertInSite(siteName, cache -> { + for (int i = 0; i < NR_KEYS; ++i) { + assertEquals(val(Integer.toString(i)), cache.get(key(Integer.toString(i)))); } }); } - protected void assertXSiteStatus(String localSite, String remoteSite, String status) { - assertEquals(status, getXSitePushStatus(localSite, remoteSite)); + void assertXSiteErrorStatus() { + assertEquals(STATUS_ERROR, getXSitePushStatus()); } - protected String getXSitePushStatus(String localSite, String remoteSite) { - return extractComponent(cache(localSite, 0), XSiteAdminOperations.class).getPushStateStatus().get(remoteSite); + String getXSitePushStatus() { + return adminOperations().getPushStateStatus().get(NYC); } - protected TestCaches createTestCache(TopologyEvent topologyEvent, String siteName) { + TestCaches createTestCache(TopologyEvent topologyEvent, String siteName) { switch (topologyEvent) { case JOIN: - return new TestCaches<>(this.cache(LON, 0), this.cache(siteName, 0), -1); + return new TestCaches<>(this.cache(LON, 0), this.cache(siteName, 0), -1); case COORDINATOR_LEAVE: - return new TestCaches<>(this.cache(LON, 1), this.cache(siteName,0), 1); + return new TestCaches<>(this.cache(LON, 1), this.cache(siteName, 0), 1); case LEAVE: - return new TestCaches<>(this.cache(LON, 0), this.cache(siteName, 0), 1); + return new TestCaches<>(this.cache(LON, 0), this.cache(siteName, 0), 1); case SITE_MASTER_LEAVE: - return new TestCaches<>(this.cache(LON, 1), this.cache(siteName, 1), 0); + return new TestCaches<>(this.cache(LON, 1), this.cache(siteName, 1), 0); default: //make sure we select the caches throw new IllegalStateException(); } } - protected void printTestCaches(TestCaches testCaches) { + void printTestCaches(TestCaches testCaches) { log.debugf("Controlled cache=%s, Coordinator cache=%s, Cache to remove=%s", - addressOf(testCaches.controllerCache), - addressOf(testCaches.coordinator), - testCaches.removeIndex < 0 ? "NONE" : addressOf(cache(LON, testCaches.removeIndex))); + addressOf(testCaches.controllerCache), + addressOf(testCaches.coordinator), + testCaches.removeIndex < 0 ? "NONE" : addressOf(cache(LON, testCaches.removeIndex))); + } + + @Override + protected void adaptLONConfiguration(BackupConfigurationBuilder builder) { + builder.stateTransfer().chunkSize(2).timeout(2000); + } + + @Override + protected ConfigurationBuilder getNycActiveConfig() { + return createConfiguration(); + } + + @Override + protected ConfigurationBuilder getLonActiveConfig() { + return createConfiguration(); + } + + private void putData() { + for (int i = 0; i < NR_KEYS; ++i) { + cache(LON, 0).put(key(Integer.toString(i)), val(Integer.toString(i))); + } } - protected static enum TopologyEvent { + protected enum TopologyEvent { /** * Some node joins the cluster. */ @@ -222,10 +167,10 @@ protected static enum TopologyEvent { protected static class TestCaches { public final Cache coordinator; - public final Cache controllerCache; - public final int removeIndex; + final Cache controllerCache; + final int removeIndex; - public TestCaches(Cache coordinator, Cache controllerCache, int removeIndex) { + TestCaches(Cache coordinator, Cache controllerCache, int removeIndex) { this.coordinator = coordinator; this.controllerCache = controllerCache; this.removeIndex = removeIndex; diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/RetryMechanismTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/RetryMechanismTest.java index 66abac993f52..0ad407bc9ea2 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/RetryMechanismTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/RetryMechanismTest.java @@ -1,10 +1,12 @@ package org.infinispan.xsite.statetransfer.failures; -import static org.infinispan.test.TestingUtil.WrapFactory; import static org.infinispan.test.TestingUtil.extractGlobalComponent; import static org.infinispan.test.TestingUtil.replaceComponent; -import static org.infinispan.test.TestingUtil.wrapInboundInvocationHandler; import static org.infinispan.test.TestingUtil.wrapComponent; +import static org.infinispan.test.TestingUtil.wrapInboundInvocationHandler; +import static org.infinispan.xsite.statetransfer.XSiteStateTransferManager.STATUS_ERROR; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNull; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -27,8 +29,6 @@ import org.infinispan.xsite.statetransfer.XSiteState; import org.infinispan.xsite.statetransfer.XSiteStateConsumer; import org.infinispan.xsite.statetransfer.XSiteStatePushCommand; -import org.infinispan.xsite.statetransfer.XSiteStateTransferManager; -import org.testng.AssertJUnit; import org.testng.annotations.Test; /** @@ -47,29 +47,24 @@ public class RetryMechanismTest extends AbstractTopologyChangeTest { * will apply the data (test retry in NYC). */ public void testExceptionWithSuccessfulRetry() { - takeSiteOffline(LON, NYC); + takeSiteOffline(); final Object key = new MagicKey(cache(NYC, 1)); final FailureHandler handler = FailureHandler.replaceOn(cache(NYC, 1)); final CounterBackupReceiverRepository counterRepository = CounterBackupReceiverRepository.replaceOn(cache(NYC, 0).getCacheManager()); cache(LON, 0).put(key, VALUE); - handler.fail(3); //it fails 3 times and then succeeds. + handler.fail(); //it fails 3 times and then succeeds. - startStateTransfer(cache(LON, 0), NYC); + startStateTransfer(); assertOnline(LON, NYC); awaitXSiteStateSent(LON); - awaitXSiteStateReceived(NYC); - - AssertJUnit.assertEquals(0, handler.remainingFails()); - AssertJUnit.assertEquals(1, counterRepository.counter.get()); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - AssertJUnit.assertEquals(VALUE, cache.get(key)); - } - }); + assertEventuallyNoStateTransferInReceivingSite(null); + + assertEquals(0, handler.remainingFails()); + assertEquals(1, counterRepository.counter.get()); + assertInSite(NYC, cache -> assertEquals(VALUE, cache.get(key))); } /** @@ -77,7 +72,7 @@ public void assertInCache(Cache cache) { * (test retry in NYC and LON). */ public void testExceptionWithFailedRetry() { - takeSiteOffline(LON, NYC); + takeSiteOffline(); final Object key = new MagicKey(cache(NYC, 1)); final FailureHandler handler = FailureHandler.replaceOn(cache(NYC, 1)); final CounterBackupReceiverRepository counterRepository = CounterBackupReceiverRepository.replaceOn(cache(NYC, 0).getCacheManager()); @@ -86,21 +81,16 @@ public void testExceptionWithFailedRetry() { handler.failAlways(); - startStateTransfer(cache(LON, 0), NYC); + startStateTransfer(); assertOnline(LON, NYC); awaitXSiteStateSent(LON); - awaitXSiteStateReceived(NYC); + assertEventuallyNoStateTransferInReceivingSite(null); - assertXSiteStatus(LON, NYC, XSiteStateTransferManager.STATUS_ERROR); + assertXSiteErrorStatus(); - AssertJUnit.assertEquals(3 /*max_retry + 1*/, counterRepository.counter.get()); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - AssertJUnit.assertNull(cache.get(key)); - } - }); + assertEquals(3 /*max_retry + 1*/, counterRepository.counter.get()); + assertInSite(NYC, cache -> assertNull(cache.get(key))); } /** @@ -108,36 +98,26 @@ public void assertInCache(Cache cache) { * retry in NYC, from remote to local). */ public void testRetryLocally() throws ExecutionException, InterruptedException { - takeSiteOffline(LON, NYC); + takeSiteOffline(); final Object key = new MagicKey(cache(NYC, 1)); final DiscardHandler handler = DiscardHandler.replaceOn(cache(NYC, 1)); final CounterBackupReceiverRepository counterRepository = CounterBackupReceiverRepository.replaceOn(cache(NYC, 0).getCacheManager()); cache(LON, 0).put(key, VALUE); - startStateTransfer(cache(LON, 0), NYC); + startStateTransfer(); assertOnline(LON, NYC); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return handler.discarded; - } - }); + eventually(() -> handler.discarded); triggerTopologyChange(NYC, 1).get(); awaitXSiteStateSent(LON); - awaitXSiteStateReceived(NYC); + assertEventuallyNoStateTransferInReceivingSite(null); - AssertJUnit.assertEquals(1, counterRepository.counter.get()); + assertEquals(1, counterRepository.counter.get()); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - AssertJUnit.assertEquals(VALUE, cache.get(key)); - } - }); + assertInSite(NYC, cache -> assertEquals(VALUE, cache.get(key))); } /** @@ -145,41 +125,31 @@ public void assertInCache(Cache cache) { * The 1st and the 2nd time will fail and only the 3rd will succeed (testing local retry) */ public void testMultipleRetryLocally() throws ExecutionException, InterruptedException { - takeSiteOffline(LON, NYC); + takeSiteOffline(); final Object key = new MagicKey(cache(NYC, 1)); final DiscardHandler handler = DiscardHandler.replaceOn(cache(NYC, 1)); final FailureXSiteConsumer failureXSiteConsumer = FailureXSiteConsumer.replaceOn(cache(NYC, 0)); final CounterBackupReceiverRepository counterRepository = CounterBackupReceiverRepository.replaceOn(cache(NYC, 0).getCacheManager()); - failureXSiteConsumer.fail(3); + failureXSiteConsumer.fail(); cache(LON, 0).put(key, VALUE); - startStateTransfer(cache(LON, 0), NYC); + startStateTransfer(); assertOnline(LON, NYC); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return handler.discarded; - } - }); + eventually(() -> handler.discarded); triggerTopologyChange(NYC, 1).get(); awaitXSiteStateSent(LON); - awaitXSiteStateReceived(NYC); + assertEventuallyNoStateTransferInReceivingSite(null); - AssertJUnit.assertEquals(0, failureXSiteConsumer.remainingFails()); + assertEquals(0, failureXSiteConsumer.remainingFails()); - AssertJUnit.assertEquals(1, counterRepository.counter.get()); + assertEquals(1, counterRepository.counter.get()); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - AssertJUnit.assertEquals(VALUE, cache.get(key)); - } - }); + assertInSite(NYC, cache -> assertEquals(VALUE, cache.get(key))); } /** @@ -187,7 +157,7 @@ public void assertInCache(Cache cache) { * (test retry in the LON site). */ public void testFailRetryLocally() throws ExecutionException, InterruptedException { - takeSiteOffline(LON, NYC); + takeSiteOffline(); final Object key = new MagicKey(cache(NYC, 1)); final DiscardHandler handler = DiscardHandler.replaceOn(cache(NYC, 1)); final FailureXSiteConsumer failureXSiteConsumer = FailureXSiteConsumer.replaceOn(cache(NYC, 0)); @@ -197,41 +167,26 @@ public void testFailRetryLocally() throws ExecutionException, InterruptedExcepti cache(LON, 0).put(key, VALUE); - startStateTransfer(cache(LON, 0), NYC); + startStateTransfer(); assertOnline(LON, NYC); - eventually(new Condition() { - @Override - public boolean isSatisfied() throws Exception { - return handler.discarded; - } - }); + eventually(() -> handler.discarded); triggerTopologyChange(NYC, 1).get(); awaitXSiteStateSent(LON); - awaitXSiteStateReceived(NYC); + assertEventuallyNoStateTransferInReceivingSite(null); //tricky part. When the primary owners dies, the site master or the other node can become the primary owner //if the site master is enabled, it will never be able to apply the state (XSiteStateConsumer is throwing exception!) //otherwise, the other node will apply the state - if (XSiteStateTransferManager.STATUS_ERROR.equals(getXSitePushStatus(LON, NYC))) { - AssertJUnit.assertEquals(3 /*max_retry + 1*/, counterRepository.counter.get()); + if (STATUS_ERROR.equals(getXSitePushStatus())) { + assertEquals(3 /*max_retry + 1*/, counterRepository.counter.get()); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - AssertJUnit.assertNull(cache.get(key)); - } - }); + assertInSite(NYC, cache -> assertNull(cache.get(key))); } else { - AssertJUnit.assertEquals(2 /*the 1st retry succeed*/, counterRepository.counter.get()); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - AssertJUnit.assertEquals(VALUE, cache.get(key)); - } - }); + assertEquals(2 /*the 1st retry succeed*/, counterRepository.counter.get()); + assertInSite(NYC, cache -> assertEquals(VALUE, cache.get(key))); } } @@ -261,7 +216,7 @@ public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception }; } - public static CounterBackupReceiverRepository replaceOn(CacheContainer cacheContainer) { + static CounterBackupReceiverRepository replaceOn(CacheContainer cacheContainer) { BackupReceiverRepository delegate = extractGlobalComponent(cacheContainer, BackupReceiverRepository.class); CounterBackupReceiverRepository wrapper = new CounterBackupReceiverRepository(delegate); replaceComponent(cacheContainer, BackupReceiverRepository.class, wrapper, true); @@ -271,7 +226,7 @@ public static CounterBackupReceiverRepository replaceOn(CacheContainer cacheCont private static class FailureXSiteConsumer implements XSiteStateConsumer { - public static int FAIL_FOR_EVER = -1; + static final int FAIL_FOR_EVER = -1; private final XSiteStateConsumer delegate; //fail if > 0 private int nFailures = 0; @@ -311,35 +266,28 @@ public String getSendingSiteName() { return delegate.getSendingSiteName(); } - public void fail(int nTimes) { - if (nTimes < 0) { - throw new IllegalArgumentException("nTimes should greater than zero but it is " + nTimes); - } + static FailureXSiteConsumer replaceOn(Cache cache) { + return wrapComponent(cache, XSiteStateConsumer.class, (wrapOn, current) -> new FailureXSiteConsumer(current), + true); + } + + void fail() { synchronized (this) { - this.nFailures = nTimes; + this.nFailures = 3; } } - public void failAlways() { + void failAlways() { synchronized (this) { this.nFailures = FAIL_FOR_EVER; } } - public int remainingFails() { + int remainingFails() { synchronized (this) { return nFailures; } } - - public static FailureXSiteConsumer replaceOn(Cache cache) { - return wrapComponent(cache, XSiteStateConsumer.class, new WrapFactory>() { - @Override - public FailureXSiteConsumer wrap(Cache wrapOn, XSiteStateConsumer current) { - return new FailureXSiteConsumer(current); - } - }, true); - } } private static class DiscardHandler extends AbstractDelegatingHandler { @@ -350,7 +298,7 @@ private DiscardHandler(PerCacheInboundInvocationHandler delegate) { super(delegate); } - public static DiscardHandler replaceOn(Cache cache) { + static DiscardHandler replaceOn(Cache cache) { return wrapInboundInvocationHandler(cache, DiscardHandler::new); } @@ -365,7 +313,7 @@ protected boolean beforeHandle(CacheRpcCommand command, Reply reply, DeliverOrde private static class FailureHandler extends AbstractDelegatingHandler { - public static int FAIL_FOR_EVER = -1; + static final int FAIL_FOR_EVER = -1; //fail if > 0 private int nFailures = 0; @@ -374,31 +322,28 @@ private FailureHandler(PerCacheInboundInvocationHandler delegate) { super(delegate); } - public void fail(int nTimes) { - if (nTimes < 0) { - throw new IllegalArgumentException("nTimes should greater than zero but it is " + nTimes); - } + static FailureHandler replaceOn(Cache cache) { + return wrapInboundInvocationHandler(cache, FailureHandler::new); + } + + void fail() { synchronized (this) { - this.nFailures = nTimes; + this.nFailures = 3; } } - public void failAlways() { + void failAlways() { synchronized (this) { this.nFailures = FAIL_FOR_EVER; } } - public int remainingFails() { + int remainingFails() { synchronized (this) { return nFailures; } } - public static FailureHandler replaceOn(Cache cache) { - return wrapInboundInvocationHandler(cache, FailureHandler::new); - } - @Override protected synchronized boolean beforeHandle(CacheRpcCommand command, Reply reply, DeliverOrder order) { if (command instanceof XSiteStatePushCommand) { diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/SiteConsumerTopologyChangeTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/SiteConsumerTopologyChangeTest.java index c0c6e98c7da1..03979a8f4d4a 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/SiteConsumerTopologyChangeTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/SiteConsumerTopologyChangeTest.java @@ -102,7 +102,8 @@ public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception return; } for (XSiteState state : cmd.getChunk()) { - addressSet.add(manager.getPrimaryLocation(state.key())); + addressSet.add(manager.getCacheTopology().getDistribution(state.key()) + .primary()); } } delegate.handleStateTransferState(cmd); @@ -126,7 +127,7 @@ public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception awaitXSiteStateSent(LON); awaitLocalStateTransfer(NYC); - awaitXSiteStateReceived(NYC); + assertEventuallyNoStateTransferInReceivingSite(null); assertData(); } @@ -183,7 +184,7 @@ public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception awaitXSiteStateSent(LON); awaitLocalStateTransfer(NYC); - awaitXSiteStateReceived(NYC); + assertEventuallyNoStateTransferInReceivingSite(null); assertData(); } diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/StateTransferLinkFailuresTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/StateTransferLinkFailuresTest.java index 64c616634e13..0f6d429d4c15 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/failures/StateTransferLinkFailuresTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/failures/StateTransferLinkFailuresTest.java @@ -3,6 +3,7 @@ import static org.infinispan.test.TestingUtil.extractComponent; import static org.infinispan.test.TestingUtil.wrapGlobalComponent; import static org.infinispan.xsite.XSiteAdminOperations.SUCCESS; +import static org.infinispan.xsite.statetransfer.XSiteStateTransferManager.STATUS_ERROR; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertNull; @@ -14,7 +15,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import org.infinispan.Cache; import org.infinispan.configuration.cache.BackupConfigurationBuilder; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; @@ -24,9 +24,7 @@ import org.infinispan.remoting.transport.BackupResponse; import org.infinispan.remoting.transport.Transport; import org.infinispan.statetransfer.CommitManager; -import org.infinispan.test.TestingUtil; import org.infinispan.util.concurrent.TimeoutException; -import org.infinispan.xsite.AbstractTwoSitesTest; import org.infinispan.xsite.XSiteAdminOperations; import org.infinispan.xsite.XSiteBackup; import org.infinispan.xsite.XSiteReplicateCommand; @@ -42,9 +40,7 @@ * @since 7.0 */ @Test(groups = "xsite", testName = "xsite.statetransfer.failures.StateTransferLinkFailuresTest") -public class StateTransferLinkFailuresTest extends AbstractTwoSitesTest { - - protected static final int NR_KEYS = 20; //10 * chunk size +public class StateTransferLinkFailuresTest extends AbstractTopologyChangeTest { public StateTransferLinkFailuresTest() { super(); @@ -58,66 +54,58 @@ public StateTransferLinkFailuresTest() { * lose link while transferring data */ - public void testStartStateTransferWithoutLink() throws Exception { + private static ConfigurationBuilder createConfiguration() { + ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false); + builder.clustering().hash().numOwners(2); + return builder; + } + + public void testStartStateTransferWithoutLink() { initBeforeTest(); - List transports = replaceTransportInSite(LON); + List transports = replaceTransportInSite(); for (ControllerTransport transport : transports) { transport.fail = true; } assertTrue(!SUCCESS.equals(extractComponent(cache(LON, 0), XSiteAdminOperations.class).pushState(NYC))); assertDataInSite(LON); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - AssertJUnit.assertTrue(cache.isEmpty()); - } - }); - assertTrue(getStatus(LON).isEmpty()); + assertInSite(NYC, cache -> AssertJUnit.assertTrue(cache.isEmpty())); + assertTrue(getStatus().isEmpty()); } public void testLinkBrokenDuringStateTransfer() { initBeforeTest(); - List transports = replaceTransportInSite(LON); + List transports = replaceTransportInSite(); for (ControllerTransport transport : transports) { transport.failAfterFirstChunk = true; } - startStateTransfer(cache(LON, 0), NYC); + startStateTransfer(); assertOnline(LON, NYC); - assertEventuallyInSite(LON, new EventuallyAssertCondition() { - @Override - public boolean assertInCache(Cache cache) { - return extractComponent(cache, XSiteStateTransferManager.class).getRunningStateTransfers().isEmpty(); - } - }, 1, TimeUnit.MINUTES); + assertEventuallyInSite(LON, + cache -> extractComponent(cache, XSiteStateTransferManager.class).getRunningStateTransfers().isEmpty(), 1, + TimeUnit.MINUTES); - assertEquals(1, getStatus(LON).size()); - assertEquals(XSiteStateTransferManager.STATUS_ERROR, getStatus(LON).get(NYC)); + assertEquals(1, getStatus().size()); + assertEquals(STATUS_ERROR, getStatus().get(NYC)); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - //link is broken. NYC is still expecting state. - assertTrue(extractComponent(cache, CommitManager.class).isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER)); - assertEquals(LON, extractComponent(cache, XSiteAdminOperations.class).getSendingSiteName()); - } + assertInSite(NYC, cache -> { + //link is broken. NYC is still expecting state. + assertTrue(extractComponent(cache, CommitManager.class).isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER)); + assertEquals(LON, extractComponent(cache, XSiteAdminOperations.class).getSendingSiteName()); }); assertEquals(SUCCESS, extractComponent(cache(NYC, 0), XSiteAdminOperations.class).cancelReceiveState(LON)); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertFalse(extractComponent(cache, CommitManager.class).isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER)); - assertNull(extractComponent(cache, XSiteAdminOperations.class).getSendingSiteName()); - } + assertInSite(NYC, cache -> { + assertFalse(extractComponent(cache, CommitManager.class).isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER)); + assertNull(extractComponent(cache, XSiteAdminOperations.class).getSendingSiteName()); }); assertEquals(SUCCESS, extractComponent(cache(LON, 0), XSiteAdminOperations.class).clearPushStateStatus()); - assertTrue(getStatus(LON).isEmpty()); + assertTrue(getStatus().isEmpty()); } @Override @@ -130,84 +118,21 @@ protected ConfigurationBuilder getLonActiveConfig() { return createConfiguration(); } - protected static ConfigurationBuilder createConfiguration() { - ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false); - builder.clustering().hash().numOwners(2); - return builder; - } - @Override protected void adaptLONConfiguration(BackupConfigurationBuilder builder) { builder.stateTransfer().chunkSize(2).timeout(2000); } - - private void putData() { - for (int i = 0; i < NR_KEYS; ++i) { - cache(LON, 0).put(key(Integer.toString(i)), val(Integer.toString(i))); - } - } - - private void initBeforeTest() { - takeSiteOffline(LON, NYC); - assertOffline(LON, NYC); - putData(); - assertDataInSite(LON); - assertInSite(NYC, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - assertTrue(cache.isEmpty()); - } - }); - } - - private void startStateTransfer(Cache coordinator, String toSite) { - XSiteAdminOperations operations = extractComponent(coordinator, XSiteAdminOperations.class); - assertEquals(SUCCESS, operations.pushState(toSite)); - } - - private void takeSiteOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(SUCCESS, operations.takeSiteOffline(remoteSite)); - } - - private void assertOffline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.OFFLINE, operations.siteStatus(remoteSite)); - } - - private void assertOnline(String localSite, String remoteSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - assertEquals(XSiteAdminOperations.ONLINE, operations.siteStatus(remoteSite)); - } - - private Map getStatus(String localSite) { - XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); - return operations.getPushStateStatus(); - } - - private void assertDataInSite(String siteName) { - assertInSite(siteName, new AssertCondition() { - @Override - public void assertInCache(Cache cache) { - for (int i = 0; i < NR_KEYS; ++i) { - assertEquals(val(Integer.toString(i)), cache.get(key(Integer.toString(i)))); - } - } - }); + private Map getStatus() { + return adminOperations().getPushStateStatus(); } - private List replaceTransportInSite(String site) { - List transports = new ArrayList<>(site(site).cacheManagers().size()); - for (CacheContainer cacheContainer : site(site).cacheManagers()) { + private List replaceTransportInSite() { + List transports = new ArrayList<>(site(LON).cacheManagers().size()); + for (CacheContainer cacheContainer : site(LON).cacheManagers()) { transports.add(wrapGlobalComponent(cacheContainer, Transport.class, - new TestingUtil.WrapFactory() { - @Override - public ControllerTransport wrap(CacheContainer wrapOn, Transport current) { - return new ControllerTransport(current); - } - }, true)); + (wrapOn, current) -> new ControllerTransport(current), true)); } return transports; } @@ -217,7 +142,7 @@ private static class ControllerTransport extends AbstractDelegatingTransport { private volatile boolean fail; private volatile boolean failAfterFirstChunk; - public ControllerTransport(Transport actual) { + ControllerTransport(Transport actual) { super(actual); }