Skip to content

Commit

Permalink
ISPN-8599 DistSyncNonTxStateTransferTest.testStateTransferWithCluster…
Browse files Browse the repository at this point in the history
…Idle random failures

* refactoring and cleaning tests under xsite.statetransfer package
  • Loading branch information
pruivo authored and galderz committed Jan 12, 2018
1 parent 2339632 commit 3ca6fcc
Show file tree
Hide file tree
Showing 12 changed files with 453 additions and 754 deletions.
Expand Up @@ -312,7 +312,9 @@ public <K,V> AdvancedCache<K,V> advancedCache(int index) {
} }


public <K, V> Cache<K, V> cache(String cacheName, int index) { public <K, V> Cache<K, V> cache(String cacheName, int index) {
return cacheManagers.get(index).getCache(cacheName); return cacheName == null ?
cache(index) :
cacheManagers.get(index).getCache(cacheName);
} }


public List<EmbeddedCacheManager> cacheManagers() { public List<EmbeddedCacheManager> cacheManagers() {
Expand Down
@@ -0,0 +1,98 @@
package org.infinispan.xsite.statetransfer;

import static org.infinispan.test.TestingUtil.extractComponent;
import static org.infinispan.xsite.XSiteAdminOperations.OFFLINE;
import static org.infinispan.xsite.XSiteAdminOperations.ONLINE;
import static org.infinispan.xsite.XSiteAdminOperations.SUCCESS;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

import java.util.concurrent.TimeUnit;

import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.xsite.AbstractTwoSitesTest;
import org.infinispan.xsite.XSiteAdminOperations;

/**
* Base class for state transfer tests with some utility method.
*
* @author Pedro Ruivo
* @since 9.2
*/
public abstract class AbstractStateTransferTest extends AbstractTwoSitesTest {

void assertNoStateTransferInReceivingSite(String cacheName) {
assertInSite(NYC, cacheName, this::assertNotReceivingStateForCache);
}

void assertNoStateTransferInSendingSite() {
assertInSite(LON, cache -> assertTrue(isNotSendingStateForCache(cache)));
}

void assertEventuallyNoStateTransferInSendingSite() {
assertEventuallyInSite(LON, this::isNotSendingStateForCache, 30, TimeUnit.SECONDS);
}

void assertEventuallyStateTransferNotRunning() {
eventually(() -> adminOperations().getRunningStateTransfer().isEmpty(), 30,
TimeUnit.SECONDS);
}

int chunkSize() {
return cache(LON, 0).getCacheConfiguration().sites().allBackups().get(0).stateTransfer().chunkSize();
}

protected void assertEventuallyNoStateTransferInReceivingSite(String cacheName) {
assertEventuallyInSite(NYC, cacheName, this::isNotReceivingStateForCache, 30, TimeUnit.SECONDS);
}

protected void startStateTransfer() {
startStateTransfer(cache(LON, 0), NYC);
}

protected void startStateTransfer(Cache<?, ?> cache, String toSite) {
XSiteAdminOperations operations = extractComponent(cache, XSiteAdminOperations.class);
assertEquals(SUCCESS, operations.pushState(toSite));
}

protected void takeSiteOffline() {
XSiteAdminOperations operations = extractComponent(cache(LON, 0), XSiteAdminOperations.class);
assertEquals(SUCCESS, operations.takeSiteOffline(NYC));
}

protected void assertOffline() {
XSiteAdminOperations operations = extractComponent(cache(LON, 0), XSiteAdminOperations.class);
assertEquals(OFFLINE, operations.siteStatus(NYC));
}

protected void assertOnline(String localSite, String remoteSite) {
XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
assertEquals(ONLINE, operations.siteStatus(remoteSite));
}

protected XSiteAdminOperations adminOperations() {
return extractComponent(cache(LON, 0), XSiteAdminOperations.class);
}

private void assertNotReceivingStateForCache(Cache<?, ?> cache) {
CommitManager commitManager = extractComponent(cache, CommitManager.class);
assertFalse(commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER));
assertFalse(commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER));
assertTrue(commitManager.isEmpty());
}

private boolean isNotReceivingStateForCache(Cache<?, ?> cache) {
CommitManager commitManager = extractComponent(cache, CommitManager.class);
return !commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER) &&
!commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER) &&
commitManager.isEmpty();
}

private boolean isNotSendingStateForCache(Cache<?, ?> cache) {
return extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty();
}

}
@@ -1,18 +1,14 @@
package org.infinispan.xsite.statetransfer; package org.infinispan.xsite.statetransfer;


import static org.infinispan.test.TestingUtil.extractComponent; import static org.infinispan.test.TestingUtil.k;
import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.assertTrue;


import java.util.concurrent.TimeUnit; import java.lang.reflect.Method;


import org.infinispan.configuration.cache.BackupConfigurationBuilder; import org.infinispan.configuration.cache.BackupConfigurationBuilder;
import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.xsite.AbstractTwoSitesTest;
import org.infinispan.xsite.XSiteAdminOperations;
import org.testng.annotations.Test; import org.testng.annotations.Test;


/** /**
Expand All @@ -22,7 +18,7 @@
* @since 7.0 * @since 7.0
*/ */
@Test(groups = "xsite", testName = "xsite.statetransfer.BackupForStateTransferTest") @Test(groups = "xsite", testName = "xsite.statetransfer.BackupForStateTransferTest")
public class BackupForStateTransferTest extends AbstractTwoSitesTest { public class BackupForStateTransferTest extends AbstractStateTransferTest {


private static final String VALUE = "value"; private static final String VALUE = "value";
private static final String LON_BACKUP_CACHE_NAME = "lonBackup"; private static final String LON_BACKUP_CACHE_NAME = "lonBackup";
Expand All @@ -32,18 +28,18 @@ public BackupForStateTransferTest() {
this.implicitBackupCache = false; this.implicitBackupCache = false;
} }


public void testStateTransferWithClusterIdle() throws InterruptedException { public void testStateTransferWithClusterIdle(Method method) {
takeSiteOffline(LON, NYC); takeSiteOffline();
assertOffline(LON, NYC); assertOffline();
assertNoStateTransferInReceivingSite(NYC, LON_BACKUP_CACHE_NAME); assertNoStateTransferInReceivingSite(LON_BACKUP_CACHE_NAME);
assertNoStateTransferInSendingSite(LON); assertNoStateTransferInSendingSite();


//NYC is offline... lets put some initial data in //NYC is offline... lets put some initial data in
//we have 2 nodes in each site and the primary owner sends the state. Lets try to have more key than the chunk //we have 2 nodes in each site and the primary owner sends the state. Lets try to have more key than the chunk
//size in order to each site to send more than one chunk. //size in order to each site to send more than one chunk.
final int amountOfData = chunkSize(LON) * 4; final int amountOfData = chunkSize() * 4;
for (int i = 0; i < amountOfData; ++i) { for (int i = 0; i < amountOfData; ++i) {
cache(LON, 0).put(key(i), VALUE); cache(LON, 0).put(k(method, i), VALUE);
} }


//check if NYC is empty (LON backup cache) //check if NYC is empty (LON backup cache)
Expand All @@ -52,25 +48,24 @@ public void testStateTransferWithClusterIdle() throws InterruptedException {
//check if NYC is empty (default cache) //check if NYC is empty (default cache)
assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); assertInSite(NYC, cache -> assertTrue(cache.isEmpty()));


startStateTransfer(LON, NYC); startStateTransfer();


eventually(() -> extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(), assertEventuallyStateTransferNotRunning();
TimeUnit.SECONDS.toMillis(30));


assertOnline(LON, NYC); assertOnline(LON, NYC);


//check if all data is visible (LON backup cache) //check if all data is visible (LON backup cache)
assertInSite(NYC, LON_BACKUP_CACHE_NAME, cache -> { assertInSite(NYC, LON_BACKUP_CACHE_NAME, cache -> {
for (int i = 0; i < amountOfData; ++i) { for (int i = 0; i < amountOfData; ++i) {
assertEquals(VALUE, cache.get(key(i))); assertEquals(VALUE, cache.get(k(method, i)));
} }
}); });


//check if NYC is empty (default cache) //check if NYC is empty (default cache)
assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); assertInSite(NYC, cache -> assertTrue(cache.isEmpty()));


assertNoStateTransferInReceivingSite(NYC, LON_BACKUP_CACHE_NAME); assertEventuallyNoStateTransferInReceivingSite(LON_BACKUP_CACHE_NAME);
assertNoStateTransferInSendingSite(LON); assertNoStateTransferInSendingSite();
} }


@Override @Override
Expand All @@ -88,45 +83,4 @@ protected void adaptLONConfiguration(BackupConfigurationBuilder builder) {
builder.site(NYC).stateTransfer().chunkSize(10); builder.site(NYC).stateTransfer().chunkSize(10);
} }


private void startStateTransfer(String fromSite, String toSite) {
XSiteAdminOperations operations = extractComponent(cache(fromSite, 0), XSiteAdminOperations.class);
assertEquals(XSiteAdminOperations.SUCCESS, operations.pushState(toSite));
}

private void takeSiteOffline(String localSite, String remoteSite) {
XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
assertEquals(XSiteAdminOperations.SUCCESS, operations.takeSiteOffline(remoteSite));
}

private void assertOffline(String localSite, String remoteSite) {
XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
assertEquals(XSiteAdminOperations.OFFLINE, operations.siteStatus(remoteSite));
}

private void assertOnline(String localSite, String remoteSite) {
XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
assertEquals(XSiteAdminOperations.ONLINE, operations.siteStatus(remoteSite));
}

private int chunkSize(String site) {
return cache(site, 0).getCacheConfiguration().sites().allBackups().get(0).stateTransfer().chunkSize();
}

private void assertNoStateTransferInReceivingSite(String siteName, String cacheName) {
assertEventuallyInSite(siteName, cacheName, cache -> {
CommitManager commitManager = extractComponent(cache, CommitManager.class);
return !commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER) &&
!commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER) &&
commitManager.isEmpty();
}, 30, TimeUnit.SECONDS);
}

private void assertNoStateTransferInSendingSite(String siteName) {
assertInSite(siteName,
cache -> assertTrue(extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty()));
}

private Object key(int index) {
return "key-" + index;
}
} }

0 comments on commit 3ca6fcc

Please sign in to comment.