Permalink
Browse files

ISPN-2606 Implement UpgradeSynchronizer as MBean

  • Loading branch information...
1 parent ad832bf commit 77c809e3551cd6b1470436f94f4980f1547dfdeb @tristantarrant tristantarrant committed Dec 7, 2012
Showing with 841 additions and 217 deletions.
  1. +1 −1 cachestore/remote/pom.xml
  2. +4 −0 cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStore.java
  3. +1 −4 ...mote/src/main/java/org/infinispan/loaders/remote/configuration/RemoteCacheStoreConfiguration.java
  4. +1 −1 cli/cli-client/src/main/java/org/infinispan/cli/commands/server/Upgrade.java
  5. +34 −4 cli/cli-client/src/main/resources/help/upgrade.txt
  6. +27 −0 cli/cli-server/pom.xml
  7. +1 −1 cli/cli-server/src/main/antlr3/org/infinispan/cli/interpreter/IspnQL.g
  8. +5 −4 cli/cli-server/src/main/java/org/infinispan/cli/interpreter/Interpreter.java
  9. +7 −0 cli/cli-server/src/main/java/org/infinispan/cli/interpreter/logging/Log.java
  10. +32 −0 cli/cli-server/src/main/java/org/infinispan/cli/interpreter/result/ResultKeys.java
  11. +10 −0 cli/cli-server/src/main/java/org/infinispan/cli/interpreter/statement/Option.java
  12. +77 −5 cli/cli-server/src/main/java/org/infinispan/cli/interpreter/statement/UpgradeStatement.java
  13. +130 −0 cli/cli-server/src/test/java/org/infinispan/cli/interpreter/UpgradeTest.java
  14. +4 −0 core/src/main/java/org/infinispan/loaders/CacheLoaderManager.java
  15. +15 −0 core/src/main/java/org/infinispan/loaders/CacheLoaderManagerImpl.java
  16. +16 −0 core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java
  17. +57 −10 core/src/main/java/org/infinispan/upgrade/RollingUpgradeManager.java
  18. +6 −2 core/src/main/java/org/infinispan/upgrade/{Migrator.java → SourceMigrator.java}
  19. +45 −0 core/src/main/java/org/infinispan/upgrade/TargetMigrator.java
  20. +9 −2 core/src/main/java/org/infinispan/util/logging/Log.java
  21. +10 −0 parent/pom.xml
  22. +1 −1 server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
  23. +4 −4 ...d/src/main/scala/org/infinispan/server/hotrod/{HotRodMigrator.scala → HotRodSourceMigrator.scala}
  24. +54 −3 upgrade-tools/pom.xml
  25. +0 −175 upgrade-tools/src/main/java/org/infinispan/upgrade/RollingUpgradeSynchronizer.java
  26. +121 −0 upgrade-tools/src/main/java/org/infinispan/upgrade/hotrod/HotRodTargetMigrator.java
  27. +50 −0 upgrade-tools/src/main/java/org/infinispan/upgrade/logging/Log.java
  28. +1 −0 upgrade-tools/src/main/resources/META-INF/services/org.infinispan.upgrade.TargetMigrator
  29. +118 −0 upgrade-tools/src/test/java/org/infinispan/upgrade/hotrod/HotRodUpgradeSynchronizerTest.java
@@ -66,7 +66,7 @@
</dependency>
</dependencies>
- <build>
+ <build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
@@ -233,4 +233,8 @@ public void setInternalCacheEntryFactory(InternalEntryFactory iceFactory) {
}
this.iceFactory = iceFactory;
}
+
+ public RemoteCache<Object, Object> getRemoteCache() {
+ return remoteCache;
+ }
}
@@ -214,8 +214,5 @@ public RemoteCacheStoreConfig adapt() {
config.setHotRodClientProperties(hrp);
return config;
}
-
-
-
-
}
+
@@ -26,7 +26,7 @@
import org.infinispan.cli.shell.Completer;
public class Upgrade extends AbstractServerCommand {
- private static final List<String> OPTIONS = Arrays.asList("--dumpkeys");
+ private static final List<String> OPTIONS = Arrays.asList("--dumpkeys", "--synchronize=", "--disconnectsource=", "--all");
@Override
public String getName() {
@@ -1,13 +1,43 @@
.SH SYNOPSIS
.B upgrade [
-.I --dumpkeys
+.I --dumpkeys | --synchronize=migrator | --disconnectsource=migrator
.B ] [
-.I cachename
+.I cachename | --all
.B ]
.SH DESCRIPTION
This command performs operations used during the rolling upgrade procedure.
.SH ARGUMENTS
.IP --dumpkeys
-Performs the dump of all the keys in the cache to a known entry. It should be performed on the "old" cluster so that the "new" cluster can fetch the entire keyset efficiently to complete the synchronization operation
+Performs the dump of all the keys in the cache to a known entry. It must be performed on the "source" cluster so that the "target" cluster can fetch the entire keyset efficiently to complete the synchronization operation
+.IP --synchronize=migrator
+Performs the synchronization of all data from the "source" cluster to the "target" cluster using the specified migrator. It must be performed on the "target" cluster after the
+.I --dumpkeys
+operation has been performed on the "source" cluster. The only migrator currently available is
+.B hotrod
+which migrates entries between caches exposed via the HotRod remoting protocol.
+.IP --disconnectsource=migrator
+Disconnects the "target" cluster from the "source" cluster. This is performed in a
+.I migrator
+-specific way. After this operation has been performed the "source" cluster can be switched off
+.IP --all
+Specifies that the requested operation should be performed on all caches in the currently selected container
.IP cachename
-(optional) the name of the cache on which to invoke the specified upgrade command
+(optional) the name of the cache on which to invoke the specified upgrade command. If unspecified, the currently selected cache will be used. See also the
+.I --all
+switch above
+.SH USAGE
+In order to perform a rolling upgrade of a HotRod cluster, the following steps must be taken
+ 1. Configure and start a new cluster with a RemoteCacheStore pointing to the old cluster and the
+.I hotRodWrapping
+flag enabled
+ 2. Configure all clients so that they will connect to the new cluster
+ 3. Invoke the
+.I upgrade --dumpkeys
+command on the old cluster for all of the caches that need to be migrated
+ 4. Invoke the
+.I upgrade --synchronize=hotrod
+command on the new cluster to ensure that all data is migrated from the old cluster to the new one
+ 5. Invoke the
+.I upgrade --disconnectsource=hotrod
+command on the new cluster to disable the RemoteCacheStore used to migrate the data
+ 6. Switch off the old cluster
View
@@ -59,6 +59,33 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-client-hotrod</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-cachestore-remote</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-server-hotrod</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-server-hotrod</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-upgrade-tools</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -233,7 +233,7 @@ statementOptions returns [List<Option> options]
;
statementOption returns [Option option]
- : '--' STRINGLITERAL { $option = new Option(unquote($STRINGLITERAL.text)); }
+ : '--' optionName = STRINGLITERAL ('=' optionParameter = STRINGLITERAL)? { $option = new Option(unquote($optionName.text), unquote($optionParameter.text)); }
;
literal returns [Object o]
@@ -40,6 +40,7 @@
import org.infinispan.cli.interpreter.logging.Log;
import org.infinispan.cli.interpreter.result.EmptyResult;
import org.infinispan.cli.interpreter.result.Result;
+import org.infinispan.cli.interpreter.result.ResultKeys;
import org.infinispan.cli.interpreter.session.Session;
import org.infinispan.cli.interpreter.session.SessionImpl;
import org.infinispan.cli.interpreter.statement.Statement;
@@ -159,19 +160,19 @@ void expireSessions() {
}
}
if (output.length() > 0) {
- response.put("OUTPUT", output.toString());
+ response.put(ResultKeys.OUTPUT.toString(), output.toString());
}
} catch (Exception e) {
log.interpreterError(e);
- response.put("ERROR", e.getMessage());
+ response.put(ResultKeys.ERROR.toString(), e.getMessage());
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
- response.put("STACKTRACE", sw.toString());
+ response.put(ResultKeys.STACKTRACE.toString(), sw.toString());
} finally {
if (session != null) {
session.reset();
- response.put("CACHE", session.getCurrentCacheName());
+ response.put(ResultKeys.CACHE.toString(), session.getCurrentCacheName());
}
SysPropertyActions.setThreadContextClassLoader(oldCL);
@@ -20,6 +20,7 @@
import static org.jboss.logging.Logger.Level.ERROR;
+import org.infinispan.cli.interpreter.result.StatementException;
import org.jboss.logging.Cause;
import org.jboss.logging.LogMessage;
import org.jboss.logging.Message;
@@ -45,4 +46,10 @@
@LogMessage(level = ERROR)
@Message(value = "Interpreter error", id = 19003)
void interpreterError(@Cause Exception e);
+
+ @Message(value = "No action has been specified for the upgrade command", id = 19004)
+ StatementException missingUpgradeAction();
+
+ @Message(value = "No migrator has been specified", id = 19005)
+ StatementException missingMigrator();
}
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+package org.infinispan.cli.interpreter.result;
+
+/**
+ * ResultKeys.
+ *
+ * @author Tristan Tarrant
+ * @since 5.2
+ */
+public enum ResultKeys {
+ CACHE,
+ ERROR,
+ STACKTRACE,
+ OUTPUT
+}
@@ -26,15 +26,25 @@
*/
public class Option {
final String name;
+ final String parameter;
public Option(String name) {
+ this(name, null);
+ }
+
+ public Option(String name, String parameter) {
this.name = name;
+ this.parameter = parameter;
}
public String getName() {
return name;
}
+ public String getParameter() {
+ return parameter;
+ }
+
@Override
public String toString() {
return name;
@@ -18,14 +18,19 @@
*/
package org.infinispan.cli.interpreter.statement;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.infinispan.Cache;
+import org.infinispan.cli.interpreter.logging.Log;
import org.infinispan.cli.interpreter.result.EmptyResult;
import org.infinispan.cli.interpreter.result.Result;
import org.infinispan.cli.interpreter.result.StatementException;
import org.infinispan.cli.interpreter.session.Session;
+import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.upgrade.RollingUpgradeManager;
+import org.infinispan.util.logging.LogFactory;
/**
* Performs operation related to rolling upgrades
@@ -34,6 +39,7 @@
* @since 5.2
*/
public class UpgradeStatement implements Statement {
+ public static final Log log = LogFactory.getLog(UpgradeStatement.class, Log.class);
final String cacheName;
final private List<Option> options;
@@ -45,17 +51,83 @@ public UpgradeStatement(List<Option> options, String cacheName) {
@Override
public Result execute(Session session) throws StatementException {
- Cache<Object, Object> cache = session.getCache(cacheName);
- RollingUpgradeManager upgradeManager = cache.getAdvancedCache().getComponentRegistry().getComponent(RollingUpgradeManager.class);
+ boolean all = false;
+ UpgradeMode mode = UpgradeMode.NONE;
+ String migratorName = null;
+
for (Option opt : options) {
- if ("dumpkeys".equals(opt.getName())) {
- upgradeManager.recordKnownGlobalKeyset();
+ if ("all".equals(opt.getName())) {
+ all = true;
+ } else if ("dumpkeys".equals(opt.getName())) {
+ mode = UpgradeMode.DUMPKEYS;
+ } else if ("synchronize".equals(opt.getName())) {
+ mode = UpgradeMode.SYNCHRONIZE;
+ migratorName = opt.getParameter();
+ if (migratorName == null) {
+ throw log.missingMigrator();
+ }
+ } else if ("disconnectsource".equals(opt.getName())) {
+ mode = UpgradeMode.DISCONNECTSOURCE;
+ migratorName = opt.getParameter();
+ if (migratorName == null) {
+ throw log.missingMigrator();
+ }
} else {
- throw new StatementException("Unknown option "+opt.getName());
+ throw new StatementException("Unknown option " + opt.getName());
+ }
+ }
+ switch (mode) {
+ case DUMPKEYS: {
+ for (Cache<?, ?> cache : all ? getAllCaches(session) : Collections.singletonList(session.getCache(cacheName))) {
+ RollingUpgradeManager upgradeManager = cache.getAdvancedCache().getComponentRegistry().getComponent(RollingUpgradeManager.class);
+ upgradeManager.recordKnownGlobalKeyset();
+ }
+ break;
+ }
+ case SYNCHRONIZE: {
+ for (Cache<?, ?> cache : all ? getAllCaches(session) : Collections.singletonList(session.getCache(cacheName))) {
+ RollingUpgradeManager upgradeManager = cache.getAdvancedCache().getComponentRegistry().getComponent(RollingUpgradeManager.class);
+ try {
+ upgradeManager.synchronizeData(migratorName);
+ } catch (Exception e) {
+ throw new StatementException(e.getMessage());
+ }
+ }
+ break;
+ }
+ case DISCONNECTSOURCE: {
+ for (Cache<?, ?> cache : all ? getAllCaches(session) : Collections.singletonList(session.getCache(cacheName))) {
+ RollingUpgradeManager upgradeManager = cache.getAdvancedCache().getComponentRegistry().getComponent(RollingUpgradeManager.class);
+ try {
+ upgradeManager.disconnectSource(migratorName);
+ } catch (Exception e) {
+ throw new StatementException(e.getMessage());
+ }
}
+ break;
+ }
+ case NONE: {
+ throw log.missingUpgradeAction();
+ }
}
return EmptyResult.RESULT;
}
+ private List<Cache<?, ?>> getAllCaches(Session session) {
+ List<Cache<?, ?>> caches = new ArrayList<Cache<?, ?>>();
+ EmbeddedCacheManager container = session.getCacheManager();
+ for (String cacheName : container.getCacheNames()) {
+ if (container.isRunning(cacheName)) {
+ caches.add(session.getCache(cacheName));
+ }
+ }
+ caches.add(container.getCache());
+
+ return caches;
+ }
+
+ private enum UpgradeMode {
+ NONE, DUMPKEYS, SYNCHRONIZE, DISCONNECTSOURCE
+ }
}
Oops, something went wrong.

0 comments on commit 77c809e

Please sign in to comment.