Permalink
Browse files

ISPN-2346 Create a RollingUpgradeSynchronizer to move data from an ol…

…d cluster to a new one
  • Loading branch information...
maniksurtani authored and tristantarrant committed Sep 26, 2012
1 parent ff96964 commit ddb276280a7c550235356460d85a2e64ef796fbf
Showing with 218 additions and 0 deletions.
  1. +1 −0 pom.xml
  2. +42 −0 upgrade-tools/pom.xml
  3. +175 −0 upgrade-tools/src/main/java/org/infinispan/upgrade/RollingUpgradeSynchronizer.java
View
@@ -63,6 +63,7 @@
<module>server/rest</module>
<module>client/hotrod-client</module>
<module>rhq-plugin</module>
+ <module>upgrade-tools</module>
<module>spring</module>
<module>cli/cli-server</module>
<module>cli/cli-client</module>
View
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2012 Red Hat, Inc. and/or its affiliates.
+ ~
+ ~ This is free software; you can redistribute it and/or modify it
+ ~ under the terms of the GNU Lesser General Public License as
+ ~ published by the Free Software Foundation; either version 2.1 of
+ ~ the License, or (at your option) any later version.
+ ~
+ ~ This software is distributed in the hope that it will be useful,
+ ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ ~ Lesser General Public License for more details.
+ ~
+ ~ You should have received a copy of the GNU Lesser General Public
+ ~ License along with this library; if not, write to the Free Software
+ ~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ ~ 02110-1301 USA
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-parent</artifactId>
+ <version>5.2.0-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>upgrade-tools</artifactId>
+ <name>Infinispan Rolling Upgrade Tooling</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-client-hotrod</artifactId>
+ <version>5.2.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
@@ -0,0 +1,175 @@
+package org.infinispan.upgrade;
+
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.io.ByteBuffer;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.marshall.BufferSizePredictor;
+import org.infinispan.marshall.Marshaller;
+import org.infinispan.marshall.jboss.GenericJBossMarshaller;
+import org.infinispan.util.ByteArrayKey;
+import org.infinispan.util.FileLookup;
+import org.infinispan.util.FileLookupFactory;
+import org.infinispan.util.Util;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Manik Surtani
+ * @since 5.1
+ */
+
+// TODO: make this accessible via JMX on the new cluster as well. Not just via the command-line with a boat load of jars!
+public class RollingUpgradeSynchronizer {
+
+ private final Properties oldCluster;
+ private final Properties newCluster;
+ private final String cacheName;
+ private int threads;
+
+ public static void main(String[] args) throws UnsupportedEncodingException {
+ RollingUpgradeSynchronizer r = new RollingUpgradeSynchronizer(args);
+ r.start();
+ }
+
+ public RollingUpgradeSynchronizer(String[] args) {
+ if (args.length < 2)
+ helpAndExit();
+
+ String oldClusterCfg = args[0];
+ String newClusterCfg = args[1];
+
+ oldCluster = readProperties(oldClusterCfg);
+ newCluster = readProperties(newClusterCfg);
+
+ if (args.length >= 3)
+ cacheName = args[2];
+ else
+ cacheName = CacheContainer.DEFAULT_CACHE_NAME;
+
+ threads = Runtime.getRuntime().availableProcessors(); // default to the number of CPUs
+ if (args.length >= 4) {
+ try {
+ threads = Integer.parseInt(args[3]);
+ } catch (Exception e) {
+ System.out.printf(" WARN: parameter %s should represent the nunber of threads to use, and be an integer. Using the default number of threads instead.%n", args[3]);
+ }
+ }
+ }
+
+ private static void helpAndExit() {
+ System.out.println(" Usage: RollingUpgradeSynchronizer <old cluster properties file> <new cluster properties file> <cache name> <num threads to use>");
+ System.out.println();
+ System.out.println(" The last two parameters are optional, defaulting to the default cache and number of processors, respectively.");
+ System.out.println();
+ System.exit(0);
+ }
+
+ private static Properties readProperties(String propsFile) {
+ try {
+ Properties p = new Properties();
+ FileLookup lookup = FileLookupFactory.newInstance();
+ p.load(lookup.lookupFile(propsFile, RollingUpgradeSynchronizer.class.getClassLoader()));
+ return p;
+ } catch (Exception e) {
+ System.out.printf(" FATAL: Unable to load properties file %s! Exiting!%n", propsFile);
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ private void start() {
+ long start = System.currentTimeMillis();
+ // TODO: Take in more parameters, e.g., port, etc., possibly via a config file.
+ // TODO: Should also take in a cache name, or even a set of cache names to migrate.
+ Marshaller m = new MigrationMarshaller();
+
+ RemoteCacheManager rcmOld = new RemoteCacheManager(m, oldCluster);
+ final RemoteCacheManager rcmNew = new RemoteCacheManager(m, newCluster);
+
+ Set<ByteArrayKey> keys = (Set<ByteArrayKey>) rcmOld.getCache(cacheName).get("___MigrationManager_HotRod_KnownKeys___");
+
+ System.out.printf(">> Retrieved %s keys stored in cache %s on the old cluster.%n", keys.size(), cacheName);
+
+ ExecutorService es = Executors.newFixedThreadPool(threads);
+
+ final AtomicInteger count = new AtomicInteger(0);
+ for (final ByteArrayKey key: keys) {
+ es.submit(new Runnable() {
+ @Override
+ public void run() {
+ // the custom marshaller registered above will make sure this byte array is placed, verbatim, on the stream
+ rcmNew.getCache(cacheName).get(key.getData());
+ int i = count.get();
+ if (i % 100 == 0) System.out.printf(">> Moved %s keys%n", i);
+ }
+ });
+ count.getAndIncrement();
+ }
+
+ es.shutdown();
+ while (!es.isShutdown()) LockSupport.parkNanos(TimeUnit.NANOSECONDS.convert(100, TimeUnit.MILLISECONDS));
+ System.out.printf(">> Transferred %s entries in cache %s from the old cluster to the new, in %s%n", keys.size(), cacheName, Util.prettyPrintTime(System.currentTimeMillis() - start));
+ }
+
+ private static class MigrationMarshaller implements Marshaller {
+
+ private final Marshaller delegate = new GenericJBossMarshaller();
+
+ @Override
+ public byte[] objectToByteBuffer(Object o, int i) throws IOException, InterruptedException {
+ if (o instanceof byte[])
+ return (byte[]) o;
+ else
+ return delegate.objectToByteBuffer(o, i);
+ }
+
+ @Override
+ public byte[] objectToByteBuffer(Object o) throws IOException, InterruptedException {
+ if (o instanceof byte[])
+ return (byte[]) o;
+ else
+ return delegate.objectToByteBuffer(o);
+ }
+
+ @Override
+ public Object objectFromByteBuffer(byte[] bytes) throws IOException, ClassNotFoundException {
+ return delegate.objectFromByteBuffer(bytes);
+ }
+
+ @Override
+ public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws IOException, ClassNotFoundException {
+ return delegate.objectFromByteBuffer(bytes, i, i1);
+ }
+
+ @Override
+ public ByteBuffer objectToBuffer(Object o) throws IOException, InterruptedException {
+ if (o instanceof byte[]) {
+ byte[] bytes = (byte[]) o;
+ return new ByteBuffer(bytes, 0, bytes.length);
+ } else {
+ return delegate.objectToBuffer(o);
+ }
+ }
+
+ @Override
+ public boolean isMarshallable(Object o) throws Exception {
+ return o instanceof byte[] || delegate.isMarshallable(o);
+ }
+
+ @Override
+ public BufferSizePredictor getBufferSizePredictor(Object o) {
+ return delegate.getBufferSizePredictor(o);
+ }
+ }
+}
+

0 comments on commit ddb2762

Please sign in to comment.