Skip to content

Commit

Permalink
ISPN-13309 Rolling Upgrades with Hot Rod protocol version < 2.8 and e…
Browse files Browse the repository at this point in the history
…ncoding fails
  • Loading branch information
Gustavo committed Sep 22, 2021
1 parent cece073 commit 175382b
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ public CompletionStage<Void> start(InitializationContext ctx) {
storageConfigurationManager.getValueStorageMediaType() : MediaType.APPLICATION_OBJECT;

// Older servers don't provide media type information
if (serverKeyStorageType == null && localKeyStorageType.isBinary()) {
if ((serverKeyStorageType == null || serverKeyStorageType.match(MediaType.APPLICATION_UNKNOWN))
&& localKeyStorageType.isBinary()) {
dataFormatBuilder.keyMarshaller(IdentityMarshaller.INSTANCE);
}
if (serverValueStorageType == null && localValueStorageType.isBinary()) {
if ((serverValueStorageType == null || serverValueStorageType.match(MediaType.APPLICATION_UNKNOWN))
&& localValueStorageType.isBinary()) {
dataFormatBuilder.valueMarshaller(IdentityMarshaller.INSTANCE);
}
supportsSegmentation = localKeyStorageType.equals(serverKeyStorageType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.infinispan.persistence.remote.upgrade;

import static org.infinispan.client.hotrod.ProtocolVersion.PROTOCOL_VERSION_25;
import static org.infinispan.client.hotrod.ProtocolVersion.PROTOCOL_VERSION_28;
import static org.infinispan.client.hotrod.ProtocolVersion.PROTOCOL_VERSION_31;
import static org.testng.AssertJUnit.assertEquals;

import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.upgrade.RollingUpgradeManager;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

/**
* Test for Hot Rod Rolling Upgrades using different storage types in caches
*
* @since 12.0
*/
@Test(testName = "upgrade.hotrod.HotRodUpgradeMediaTypesTest", groups = "functional")
public class HotRodUpgradeMediaTypesTest extends AbstractInfinispanTest {
protected TestCluster sourceCluster, targetCluster;

protected static final String CACHE_NAME = "theCache";

public static final int ENTRIES = 10;
private MediaType mediaType;
private ProtocolVersion version;

@Factory
public Object[] factory() {
return new Object[]{
new HotRodUpgradeMediaTypesTest().protocolVersion(PROTOCOL_VERSION_25),
new HotRodUpgradeMediaTypesTest().protocolVersion(PROTOCOL_VERSION_28),
new HotRodUpgradeMediaTypesTest().protocolVersion(PROTOCOL_VERSION_31),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.TEXT_PLAIN).protocolVersion(PROTOCOL_VERSION_25),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.TEXT_PLAIN).protocolVersion(PROTOCOL_VERSION_28),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.TEXT_PLAIN).protocolVersion(PROTOCOL_VERSION_31),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.APPLICATION_PROTOSTREAM).protocolVersion(PROTOCOL_VERSION_25),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.APPLICATION_PROTOSTREAM).protocolVersion(PROTOCOL_VERSION_28),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.APPLICATION_PROTOSTREAM).protocolVersion(PROTOCOL_VERSION_31),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.APPLICATION_SERIALIZED_OBJECT).protocolVersion(PROTOCOL_VERSION_25),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.APPLICATION_SERIALIZED_OBJECT).protocolVersion(PROTOCOL_VERSION_28),
new HotRodUpgradeMediaTypesTest().mediaType(MediaType.APPLICATION_SERIALIZED_OBJECT).protocolVersion(PROTOCOL_VERSION_31),
};
}

private HotRodUpgradeMediaTypesTest protocolVersion(ProtocolVersion version) {
this.version = version;
return this;
}

private HotRodUpgradeMediaTypesTest mediaType(MediaType mediaType) {
this.mediaType = mediaType;
return this;
}

@Override
protected String parameters() {
return String.format("[mediaType=%s,version=%s]", mediaType, version.toString().replace(".", "_"));
}

@BeforeMethod
public void setup() throws Exception {
ConfigurationBuilder config = new ConfigurationBuilder();
config.clustering().cacheMode(CacheMode.DIST_SYNC);
if (mediaType != null) {
config.encoding().mediaType(mediaType.toString());
}
sourceCluster = new TestCluster.Builder().setName("sourceCluster").setNumMembers(2)
.cache().name(CACHE_NAME).configuredWith(config)
.build();

targetCluster = new TestCluster.Builder().setName("targetCluster").setNumMembers(2)
.cache().name(CACHE_NAME).configuredWith(config)
.remoteProtocolVersion(version)
.remotePort(sourceCluster.getHotRodPort()).remoteStoreWrapping(true).remoteStoreRawValues(true)
.build();

}

public void testSynchronization() throws Exception {
RemoteCache<Object, Object> sourceRemoteCache = sourceCluster.getRemoteCache(CACHE_NAME, mediaType);
RemoteCache<Object, Object> targetRemoteCache = targetCluster.getRemoteCache(CACHE_NAME, mediaType);

for (int i = 0; i < ENTRIES; i++) {
sourceRemoteCache.put(key(i), value(i));
}

assertEquals(ENTRIES, sourceRemoteCache.size());
assertEquals(ENTRIES, targetRemoteCache.size());
assertEquals(value(5), targetRemoteCache.get(key(5)));

// Do a Rolling Upgrade
RollingUpgradeManager upgradeManager = targetCluster.getRollingUpgradeManager(CACHE_NAME);
long count = upgradeManager.synchronizeData("hotrod");
assertEquals(ENTRIES, count);

// Disconnect remote store
upgradeManager.disconnectSource("hotrod");

// Check migrated data
assertEquals(sourceCluster.getEmbeddedCache(CACHE_NAME).size(), targetCluster.getEmbeddedCache(CACHE_NAME).size());
assertEquals(value(7), targetRemoteCache.get(key(7)));
}

@AfterMethod
public void tearDown() {
sourceCluster.destroy();
targetCluster.destroy();
}

private Object key(int idx) {
return idx;
}

private Object value(int idx) {
return "Value_" + idx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.TransactionMode;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.JavaSerializationMarshaller;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.commons.marshall.UTF8StringMarshaller;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.StoreConfiguration;
Expand Down Expand Up @@ -54,6 +59,24 @@ <K, V> RemoteCache<K, V> getRemoteCache(String cacheName) {
return remoteCacheManager.getCache(cacheName);
}

<K, V> RemoteCache<K, V> getRemoteCache(String cacheName, MediaType mediaType) {
if (mediaType == null) return getRemoteCache(cacheName);

Marshaller marshaller;
switch (mediaType.toString()) {
case MediaType.TEXT_PLAIN_TYPE:
marshaller = new UTF8StringMarshaller();
break;
case MediaType.APPLICATION_SERIALIZED_OBJECT_TYPE:
marshaller = new JavaSerializationMarshaller();
break;
default:
marshaller = new ProtoStreamMarshaller();
}
DataFormat dataFormat = DataFormat.builder().keyMarshaller(marshaller).valueMarshaller(marshaller).build();
return remoteCacheManager.getCache(cacheName).withDataFormat(dataFormat);
}

<K, V> RemoteCache<K, V> getRemoteCache(String cacheName, boolean transactional) {
if (!transactional) {
return getRemoteCache(cacheName);
Expand Down

0 comments on commit 175382b

Please sign in to comment.