Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rolling upgrade support #1340

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Set<InternalCacheEntry> perform(InvocationContext ctx) throws Throwable {
@Override
public String toString() {
return "EntrySetCommand{" +
"set=" + container.entrySet() +
"set=" + container.size() + " elements" +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Set<Object> perform(InvocationContext ctx) throws Throwable {
@Override
public String toString() {
return "KeySetCommand{" +
"set=" + container.keySet() +
"set=" + container.size() + " elements" +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Collection<Object> perform(InvocationContext ctx) throws Throwable {
@Override
public String toString() {
return "ValuesCommand{" +
"values=" + container.values() +
"values=" + container.size() + " elements" +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.jmx.CacheJmxRegistration;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.upgrade.RollingUpgradeManager;
import org.infinispan.transaction.xa.recovery.RecoveryAdminOperations;
import org.infinispan.xsite.CrossSiteReplicationOperations;

Expand Down Expand Up @@ -105,6 +106,8 @@ private void bootstrap(String cacheName, AdvancedCache<?, ?> cache, Configuratio
if (!configuration.sites().inUseBackups().isEmpty()) {
componentRegistry.registerComponent(new CrossSiteReplicationOperations(), CrossSiteReplicationOperations.class.getName(), true);
}
// The RollingUpgradeManager should always be added so it is registered in JMX.
componentRegistry.registerComponent(new RollingUpgradeManager(), RollingUpgradeManager.class.getName(), true);
componentRegistry.prepareWiringCache();
}

Expand Down
32 changes: 32 additions & 0 deletions core/src/main/java/org/infinispan/upgrade/Migrator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2012 Red Hat, Inc. and/or its affiliates.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/

package org.infinispan.upgrade;

/**
* Records all known keys and stores them under a well-known key which can be used for retrieval.
*
* @author Manik Surtani
* @since 5.2
*/
public interface Migrator {
void recordKnownGlobalKeyset();

String getCacheName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2012 Red Hat, Inc. and/or its affiliates.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/

package org.infinispan.upgrade;

import org.infinispan.factories.annotations.SurvivesRestarts;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.rhq.helpers.pluginAnnotations.agent.Operation;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* This component handles the control hooks to handle migrating from one version of Infinispan to another.
*
* @author Manik Surtani
* @since 5.2
*/
@MBean(objectName = "RollingUpgradeManager", description = "This component handles the control hooks to handle migrating data from one version of Infinispan to another")
@Scope(value = Scopes.NAMED_CACHE)
@SurvivesRestarts
public class RollingUpgradeManager {
private final ConcurrentMap<String, Migrator> migrators = new ConcurrentHashMap<String, Migrator>(2);
@ManagedOperation(description = "Dumps the global known keyset to a well-known key for retrieval by the upgrade process")
@Operation(displayName = "Dumps the global known keyset to a well-known key for retrieval by the upgrade process")
public void recordKnownGlobalKeyset() {
for (Migrator m: migrators.values()) m.recordKnownGlobalKeyset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw an exception if no migrators are found ?

}

/**
* Registers a migrator for a specific data format or endpoint. In the Infinispan ecosystem, we'd typically have one
* Migrator implementation for Hot Rod, one for memcached, one for REST and one for embedded/in-VM mode, and these
* would typically be added to the upgrade manager on first access via any of these protocols.
* @param migrator
*/
public void addMigrator(Migrator migrator) {
migrators.putIfAbsent(migrator.getCacheName(), migrator);
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<module>server/rest</module>
<module>client/hotrod-client</module>
<module>rhq-plugin</module>
<module>upgrade-tools</module>
<module>spring</module>
<module>cli/cli-server</module>
<module>cli/cli-client</module>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2012 Red Hat, Inc. and/or its affiliates.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/

/**
* An implementation of Migrator, that understands the Hot Rod key and value formats.
*
* @author Manik Surtani
* @since 5.2
*/

package org.infinispan.server.hotrod

import org.infinispan.factories.annotations.{Inject, SurvivesRestarts}
import org.infinispan.factories.scopes.{Scope, Scopes}
import org.infinispan.jmx.annotations.{ManagedOperation, MBean}
import org.infinispan.util.ByteArrayKey
import org.infinispan.server.core.{Operation, CacheValue}
import org.infinispan.Cache
import org.rhq.helpers.pluginAnnotations.agent.Operation
import org.infinispan.server.core.Operation
import org.infinispan.distexec.{DistributedCallable, DefaultExecutorService}
import java.nio.charset.Charset
import org.infinispan.marshall.jboss.JBossMarshaller
import org.infinispan.upgrade.Migrator

class HotRodMigrator(cache: Cache[ByteArrayKey, CacheValue]) extends Migrator {
val KNOWN_KEY = "___MigrationManager_HotRod_KnownKeys___"
val MARSHALLER = new JBossMarshaller // TODO: Hard coded! Yuck! Assumes the Synchronizer service will use the same marshaller. Doesn't matter what actual clients use to store/retrieve data.

@Override
def getCacheName = cache.getName

@Override
def recordKnownGlobalKeyset() {
// TODO: Maybe we should allow passing in of a well-known key prefix, and return the generated key to use?
recordKnownGlobalKeyset(KNOWN_KEY)
}

def recordKnownGlobalKeyset(keyToRecordKnownKeySet: String) {

// TODO: the bulk of ths code is reusable across different implementations of Migrator
val bak = new ByteArrayKey(MARSHALLER.objectToByteBuffer(keyToRecordKnownKeySet))

val cm = cache.getCacheConfiguration().clustering().cacheMode()
var keys: java.util.HashSet[ByteArrayKey] = null
if (cm.isReplicated() || !cm.isClustered()) {
// If cache mode is LOCAL or REPL, dump local keyset.
// Defensive copy to serialize and transmit across a network
keys = new java.util.HashSet[ByteArrayKey](cache.keySet())
} else {
// If cache mode is DIST, use a map/reduce task
val des = new DefaultExecutorService(cache)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. Executor service left unclosed.
  2. I doubt migrator tasks are very common, but since the migration manager is a per-cache component, wouldn't it be better to keep the exec service there, cached, and then shut it down when the cache stops?

val keysets = des.submitEverywhere(new GlobalKeysetTask(cache))
val combinedKeyset = new java.util.HashSet[ByteArrayKey]()

for (future <- new IteratorWrapper(keysets.iterator())) {
combinedKeyset addAll future.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove KNOWN_KEY from the set ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
keys = combinedKeyset
}

// Remove KNOWN_KEY from the key set - just in case it is there from a previous run.
keys remove KNOWN_KEY

// we cannot store the Set as it is; it will break if attempting to be read via Hot Rod. This should be wrapped
// in a CacheValue.
cache.put(bak, new CacheValue(MARSHALLER.objectToByteBuffer(keys), 1))
}
}

class IteratorWrapper[A](iter:java.util.Iterator[A]) {
// TODO: should probably be in some generic scala helper module to allow scala-style iteration over java collections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this. That's what scala.collection.JavaConversions is there for (used throughout the code base).

def foreach(f: A => Unit): Unit = {
while(iter.hasNext){
f(iter.next)
}
}
}

class GlobalKeysetTask(var cache: Cache[ByteArrayKey, CacheValue]) extends DistributedCallable[ByteArrayKey, CacheValue, java.util.Set[ByteArrayKey]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this won't work. The distributed callable needs to be serializable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus even if you mark it Serializable, the way it's written it will try to serialize the cache instance.

// TODO this could possibly be moved elsewhere and reused
@Override
def call(): java.util.Set[ByteArrayKey] = {
// Defensive copy to serialize and transmit across a network
new java.util.HashSet(cache.keySet())
}

@Override
def setEnvironment(cache: Cache[ByteArrayKey, CacheValue], inputKeys: java.util.Set[ByteArrayKey]) {
this.cache = cache
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.infinispan.context.{InvocationContext, Flag}
import org.infinispan.commands.write.PutKeyValueCommand
import org.infinispan.interceptors.base.BaseCustomInterceptor
import org.infinispan.interceptors.EntryWrappingInterceptor
import org.infinispan.upgrade.RollingUpgradeManager

/**
* Hot Rod server, in charge of defining its encoder/decoder and, if clustered, update the topology information
Expand Down Expand Up @@ -188,11 +189,19 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log {
cache = cacheManager.getCache(cacheName)

knownCaches.put(cacheName, cache)
// make sure we register a Migrator for this cache!
tryRegisterMigrationManager(cacheName, cache)
}

cache
}

def tryRegisterMigrationManager(cacheName: String, cache: Cache[ByteArrayKey, CacheValue]) {
val cr = cache.getAdvancedCache.getComponentRegistry
val migrationManager = cr.getComponent(classOf[RollingUpgradeManager])
if (migrationManager != null) migrationManager.addMigrator(new HotRodMigrator(cache))
}

private[hotrod] def getAddressCache = addressCache

/**
Expand Down
42 changes: 42 additions & 0 deletions upgrade-tools/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2012 Red Hat, Inc. and/or its affiliates.
~
~ This is free software; you can redistribute it and/or modify it
~ under the terms of the GNU Lesser General Public License as
~ published by the Free Software Foundation; either version 2.1 of
~ the License, or (at your option) any later version.
~
~ This software is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
~ Lesser General Public License for more details.
~
~ You should have received a copy of the GNU Lesser General Public
~ License along with this library; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
~ 02110-1301 USA
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-parent</artifactId>
<version>5.2.0-SNAPSHOT</version>
<relativePath>../parent/pom.xml</relativePath>
</parent>

<artifactId>upgrade-tools</artifactId>
<name>Infinispan Rolling Upgrade Tooling</name>

<dependencies>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-client-hotrod</artifactId>
<version>5.2.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>