Skip to content

Commit

Permalink
ISPN-5819 Add global backup site ops to container
Browse files Browse the repository at this point in the history
Operations added:
* takeSiteOffline
* bringSiteOnline
* pushState
* cancelPushState
  • Loading branch information
pruivo committed Oct 27, 2015
1 parent 8c119a7 commit fc113d6
Show file tree
Hide file tree
Showing 10 changed files with 912 additions and 52 deletions.
@@ -1,19 +1,21 @@
package org.infinispan.cli.interpreter.statement;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.infinispan.Cache;
import org.infinispan.cli.interpreter.logging.Log;
import org.infinispan.cli.interpreter.result.EmptyResult;
import org.infinispan.cli.interpreter.result.Result;
import org.infinispan.cli.interpreter.result.StatementException;
import org.infinispan.cli.interpreter.result.StringResult;
import org.infinispan.cli.interpreter.session.Session;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.GlobalXSiteAdminOperations;
import org.infinispan.xsite.XSiteAdminOperations;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* Performs operation related to Cross-Site Replication
*
Expand All @@ -23,8 +25,9 @@
public class SiteStatement implements Statement {
private static final Log log = LogFactory.getLog(SiteStatement.class, Log.class);

static enum Options {
OFFLINE, ONLINE, STATUS, PUSH, CANCELPUSH, CANCELRECEIVE, PUSHSTATUS, CLEARPUSHSTATUS, SENDINGSITE
enum Options {
OFFLINE, ONLINE, STATUS, PUSH, CANCELPUSH, CANCELRECEIVE, PUSHSTATUS, CLEARPUSHSTATUS, SENDINGSITE,
ONLINEALL, OFFLINEALL, PUSHALL, CANCELPUSHALL
}

final private SiteData siteData;
Expand All @@ -37,31 +40,36 @@ public SiteStatement(List<Option> options, SiteData siteData) {

@Override
public Result execute(Session session) throws StatementException {
if (options.isEmpty()) {
return EmptyResult.RESULT;
}
Options option = options.get(0).toEnum(Options.class);
if (isGlobalOption(option)) {
return executeContainerOperation(option, session);
} else {
return executeCacheOperation(option, session);
}

}

private Result executeCacheOperation(Options option, Session session) throws StatementException {
Cache<Object, Object> cache = session.getCache(siteData != null ? siteData.getCacheName() : null);
String siteName = siteData != null ? siteData.getSiteName() : null;
XSiteAdminOperations xsiteAdmin = cache.getAdvancedCache().getComponentRegistry().getComponent(XSiteAdminOperations.class);
if (xsiteAdmin == null) {
throw log.noBackupsForCache(cache.getName());
}
for (Option opt : options) {
switch (opt.toEnum(Options.class)) {

requireXSiteAdmin(xsiteAdmin, cache.getName());

switch (option) {
case STATUS: {
String status = siteName == null ? xsiteAdmin.status() : xsiteAdmin.siteStatus(siteName);
return new StringResult(status);
return new StringResult(siteName == null ? xsiteAdmin.status() : xsiteAdmin.siteStatus(siteName));
}
case ONLINE: {
if (siteName != null) {
return new StringResult(xsiteAdmin.bringSiteOnline(siteName));
} else {
throw log.siteNameNotSpecified();
}
requireSiteName(siteName);
return new StringResult(xsiteAdmin.bringSiteOnline(siteName));
}
case OFFLINE: {
if (siteName != null) {
return new StringResult(xsiteAdmin.takeSiteOffline(siteName));
} else {
throw log.siteNameNotSpecified();
}
requireSiteName(siteName);
return new StringResult(xsiteAdmin.takeSiteOffline(siteName));
}
case PUSH: {
requireSiteName(siteName);
Expand All @@ -84,10 +92,51 @@ public Result execute(Session session) throws StatementException {
case SENDINGSITE: {
return new StringResult(xsiteAdmin.getSendingSiteName());
}
default:
return EmptyResult.RESULT;
}
}

private Result executeContainerOperation(Options option, Session session) throws StatementException {
EmbeddedCacheManager cacheManager = session.getCacheManager();
GlobalXSiteAdminOperations xSiteAdmin = cacheManager.getGlobalComponentRegistry().getComponent(GlobalXSiteAdminOperations.class);
String siteName = siteData != null ? siteData.getSiteName() : null;
requireSiteName(siteName);

switch (option) {
case OFFLINEALL: {
return new StringResult(xSiteAdmin.takeSiteOffline(siteName));
}
case ONLINEALL: {
return new StringResult(xSiteAdmin.bringSiteOnline(siteName));
}
case PUSHALL: {
return new StringResult(xSiteAdmin.pushState(siteName));
}
case CANCELPUSHALL: {
return new StringResult(xSiteAdmin.cancelPushState(siteName));
}
default:
return EmptyResult.RESULT;
}
}

return EmptyResult.RESULT;
private static boolean isGlobalOption(Options option) {
switch (option) {
case OFFLINEALL:
case ONLINEALL:
case PUSHALL:
case CANCELPUSHALL:
return true;
default:
return false;
}
}

private static void requireXSiteAdmin(XSiteAdminOperations xSiteAdminOperations, String cacheName) throws StatementException {
if (xSiteAdminOperations == null) {
throw log.noBackupsForCache(cacheName);
}
}

private static void requireSiteName(String siteName) throws StatementException {
Expand Down
Expand Up @@ -2,23 +2,28 @@

import org.infinispan.Cache;
import org.infinispan.cli.interpreter.result.ResultKeys;
import org.infinispan.commons.api.BasicCacheContainer;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.xsite.AbstractTwoSitesTest;
import org.infinispan.xsite.XSiteAdminOperations;
import org.infinispan.xsite.statetransfer.XSiteStateProvider;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static java.lang.String.format;
import static org.infinispan.test.TestingUtil.extractComponent;
import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.fail;

/**
* @author Tristan Tarrant
Expand All @@ -27,6 +32,10 @@
@Test(groups = "xsite", testName = "cli.interpreter.SiteStatementTest")
public class SiteStatementTest extends AbstractTwoSitesTest {

public SiteStatementTest() {
implicitBackupCache = true;
}

@Override
protected ConfigurationBuilder getNycActiveConfig() {
return getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
Expand All @@ -47,7 +56,7 @@ public void testSiteStatus() throws Exception {

assertInterpreterOutput(lonInterpreter, lonSessionId, "site --status NYC;", "online");

assertInterpreterOutput(nycInterpreter, nycSessionId, String.format("site --status %s.LON;", lonCache), "online");
assertInterpreterOutput(nycInterpreter, nycSessionId, format("site --status %s.LON;", lonCache), "online");

assertInterpreterOutput(lonInterpreter, lonSessionId, "site --offline NYC;", "ok");

Expand All @@ -73,7 +82,7 @@ public void testSiteStateTransfer() throws Exception {
assertInterpreterOutput(nycInterpreter, nycSessionId, "site --sendingsite;", "null");
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --pushstatus;", "NYC=OK");
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --clearpushstatus;", "ok");
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --pushstatus;", null);
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --pushstatus;", (String) null);
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --cancelpush NYC;", "ok");
assertInterpreterOutput(nycInterpreter, nycSessionId, "site --cancelreceive LON;", "ok");
}
Expand All @@ -97,43 +106,120 @@ public void call() {
}
}
});
}

public void testContainerOperations() throws Exception {
site(LON).cacheManagers().forEach(cacheManager -> cacheManager.defineConfiguration("another-cache", lonConfigurationBuilder().build()));
site(LON).cacheManagers().forEach(cacheManager -> cacheManager.defineConfiguration("another-cache-2", getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC).build()));
site(LON).waitForClusterToForm("another-cache");
site(LON).waitForClusterToForm("another-cache-2");

Interpreter lonInterpreter = interpreter(LON, 0);
String lonCache = cache(LON, 0).getName();
String lonSessionId = lonInterpreter.createSessionId(lonCache);

assertInterpreterOutput(lonInterpreter, lonSessionId, "site --offlineall NYC;", (output, error) -> {
assertEquals(null, error);
String outFormat = "%s: %s";
if (!output.contains(format(outFormat, BasicCacheContainer.DEFAULT_CACHE_NAME, XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", BasicCacheContainer.DEFAULT_CACHE_NAME, output));
}
if (!output.contains(format(outFormat, "another-cache", XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", "another-cache", output));
}
if (output.contains("another-cache-2")) {
fail(format("Cache '%s' should not be present in the output: %s", "another-cache-2", output));
}
});
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --status NYC;", "offline");
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --status \"another-cache\".NYC;", "offline");

assertInterpreterOutput(lonInterpreter, lonSessionId, "site --onlineall NYC;", (output, error) -> {
assertEquals(null, error);
String outFormat = "%s: %s";
if (!output.contains(format(outFormat, BasicCacheContainer.DEFAULT_CACHE_NAME, XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", BasicCacheContainer.DEFAULT_CACHE_NAME, output));
}
if (!output.contains(format(outFormat, "another-cache", XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", "another-cache", output));
}
if (output.contains("another-cache-2")) {
fail(format("Cache '%s' should not be present in the output: %s", "another-cache-2", output));
}
});
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --status NYC;", "online");
assertInterpreterOutput(lonInterpreter, lonSessionId, "site --status \"another-cache\".NYC;", "online");

assertInterpreterOutput(lonInterpreter, lonSessionId, "site --pushall NYC;", (output, error) -> {
assertEquals(null, error);
String outFormat = "%s: %s";
if (!output.contains(format(outFormat, BasicCacheContainer.DEFAULT_CACHE_NAME, XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", BasicCacheContainer.DEFAULT_CACHE_NAME, output));
}
if (!output.contains(format(outFormat, "another-cache", XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", "another-cache", output));
}
if (output.contains("another-cache-2")) {
fail(format("Cache '%s' should not be present in the output: %s", "another-cache-2", output));
}
});

assertInterpreterOutput(lonInterpreter, lonSessionId, "site --cancelpushall NYC;", (output, error) -> {
assertEquals(null, error);
String outFormat = "%s: %s";
if (!output.contains(format(outFormat, BasicCacheContainer.DEFAULT_CACHE_NAME, XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", BasicCacheContainer.DEFAULT_CACHE_NAME, output));
}
if (!output.contains(format(outFormat, "another-cache", XSiteAdminOperations.SUCCESS))) {
fail(format("Cache '%s' should be present in the output: %s", "another-cache", output));
}
if (output.contains("another-cache-2")) {
fail(format("Cache '%s' should not be present in the output: %s", "another-cache-2", output));
}
});

}

private void assertInterpreterOutput(Interpreter interpreter, String sessionId, String command, String output) throws Exception {
Map<String, String> result = interpreter.execute(sessionId, command);
assertEquals(output, result.get(ResultKeys.OUTPUT.toString()));
private void assertInterpreterOutput(Interpreter interpreter, String sessionId, String command, String expected) throws Exception {
assertInterpreterOutput(interpreter, sessionId, command, (output, error) -> {
assertEquals(null, error);
assertEquals(expected, output);
});
}

private void assertInterpreterError(Interpreter interpreter, String sessionId, String command, String output) throws Exception {
private void assertInterpreterOutput(Interpreter interpreter, String sessionId, String command, OutputValidator validator) throws Exception {
Objects.requireNonNull(validator);
Map<String, String> result = interpreter.execute(sessionId, command);
assertEquals(output, result.get(ResultKeys.ERROR.toString()));
validator.validate(result.get(ResultKeys.OUTPUT.toString()), result.get(ResultKeys.ERROR.toString()));
}

private void assertInterpreterError(Interpreter interpreter, String sessionId, String command, String expected) throws Exception {
assertInterpreterOutput(interpreter, sessionId, command, (output, error) -> {
assertEquals(null, output);
assertEquals(expected, error);
});
}

private Interpreter interpreter(String site, int cache) {
return cache(site, cache).getAdvancedCache().getComponentRegistry().getComponent(Interpreter.class);
}

private void assertEventuallyNoStateTransferInReceivingSite(String siteName, String cacheName, long timeout, TimeUnit unit) {
assertEventuallyInSite(siteName, cacheName, new EventuallyAssertCondition<Object, Object>() {
@Override
public boolean assertInCache(Cache<Object, Object> cache) {
CommitManager commitManager = extractComponent(cache, CommitManager.class);
return !commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER) &&
!commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER) &&
commitManager.isEmpty();
}
assertEventuallyInSite(siteName, cacheName, cache -> {
CommitManager commitManager = extractComponent(cache, CommitManager.class);
return !commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER) &&
!commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER) &&
commitManager.isEmpty();
}, timeout, unit);
}

private void assertEventuallyNoStateTransferInSendingSite(String siteName, String cacheName, long timeout, TimeUnit unit) {
assertEventuallyInSite(siteName, cacheName, new EventuallyAssertCondition<Object, Object>() {
@Override
public boolean assertInCache(Cache<Object, Object> cache) {
return extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty() &&
extractComponent(cache, XSiteStateTransferManager.class).getRunningStateTransfers().isEmpty();
}
}, timeout, unit);
assertEventuallyInSite(siteName, cacheName, cache ->
extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty() &&
extractComponent(cache, XSiteStateTransferManager.class).getRunningStateTransfers().isEmpty(), timeout, unit);
}

private interface OutputValidator {
void validate(String output, String error);
}
}
Expand Up @@ -32,6 +32,7 @@
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.GlobalXSiteAdminOperations;

import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
Expand Down Expand Up @@ -115,6 +116,7 @@ public GlobalComponentRegistry(GlobalConfiguration configuration,
registerComponent(new InternalCacheRegistryImpl(), InternalCacheRegistry.class);
registerComponent(new ClusterRegistryImpl(), ClusterRegistry.class);
registerComponent(new CacheStoreFactoryRegistry(), CacheStoreFactoryRegistry.class);
registerComponent(new GlobalXSiteAdminOperations(), GlobalXSiteAdminOperations.class);

moduleProperties.loadModuleCommandHandlers(configuredClassLoader);
Map<Byte, ModuleCommandFactory> factories = moduleProperties.moduleCommandFactories();
Expand Down

0 comments on commit fc113d6

Please sign in to comment.