Skip to content

Commit

Permalink
ISPN-15494 Near cache with invalidation fails to start in Spring Boot
Browse files Browse the repository at this point in the history
Updated for the invalidated near cache instances. We check if the
MANAGER marshaller is capable of handling the server type, and only then
proceed with the usual flow. Either we register listener and set data
format, or the inverse.

If the marshaller is compatible with the server type, we exchange the
type information when installing the listener. See, this might hide bugs
that bite back during runtime! This might be found during runtime when
executing some key based operation against the server.

For internal caches, where the user won't interact directly through HR
(resp), this is fine. We also have a warning in any case.
  • Loading branch information
jabolina authored and karesti committed Jan 24, 2024
1 parent f28484f commit dfa172e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.near.NearCacheService;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.Marshaller;

/**
* Near {@link org.infinispan.client.hotrod.RemoteCache} implementation enabling
Expand Down Expand Up @@ -192,17 +194,19 @@ CompletableFuture<V> invalidateNearCacheIfNeeded(boolean hasForceReturnValue, Ob
}

@Override
public void start() {
super.start();
}

@Override
public void resolveStorage(boolean objectStorage) {
super.resolveStorage(objectStorage);

// Only register the listener *after* resolving the data format to use.
// This is necessary for the listener to correctly marshall the data.
listenerAddress = nearcache.start(this);
public void resolveStorage(MediaType key, MediaType value, boolean objectStorage) {
if (key != null && !delegate.getRemoteCacheManager().getMarshaller().mediaType().match(key)) {
// The server has a storage type which the cache marshaller does not handle.
// This could lead to losing events in the bloom filter, where the client and server see the key differently.
// To avoid this issue, the client negotiate the type when installing the listener, but it needs a proper configuration.
Class<? extends Marshaller> marshallerClass = delegate.getRemoteCacheManager().getMarshaller().getClass();
log.invalidateNearDefaultMarshallerMismatch(delegate.getName(), marshallerClass, key);
listenerAddress = nearcache.start(this);
delegate.resolveStorage(key, value, objectStorage);
} else {
delegate.resolveStorage(key, value, objectStorage);
listenerAddress = nearcache.start(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,8 @@ public interface Log extends BasicLogger {
@LogMessage(level = WARN)
@Message(value = "Client cannot marshall the server's key media type ('%s'). This could cause poor performance.", id = 4116)
void serverKeyTypeNotRecognized(MediaType serverKeyType);

@LogMessage(level = WARN)
@Message(value = "Cache '%s' with marshaller %s does not handle server storage type '%s'. Configure the cache or default marshaller.", id = 4117)
void invalidateNearDefaultMarshallerMismatch(String cacheName, Class<?> clazz, MediaType serverKeyType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,36 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;

import java.util.stream.Stream;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.JavaSerializationMarshaller;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.jboss.marshalling.commons.GenericJBossMarshaller;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "client.hotrod.event.ClusteredListenerMarshallerTest")
public class ClusterNearCacheMarshallingTest extends MultiHotRodServersTest {

private static final String SERVER_DEFINED_CACHE = "other-cache";
private Class<? extends Marshaller> marshaller;
private MediaType storeType;
private boolean bloomFilter;

public ClusterNearCacheMarshallingTest() { }

protected ClusterNearCacheMarshallingTest(Class<? extends Marshaller> marshaller, MediaType storeType) {
protected ClusterNearCacheMarshallingTest(Class<? extends Marshaller> marshaller, MediaType storeType, boolean bloomFilter) {
this.marshaller = marshaller;
this.storeType = storeType;
this.bloomFilter = bloomFilter;
}

@Override
Expand All @@ -35,19 +42,31 @@ protected void createCacheManagers() throws Throwable {
createHotRodServers(2, serverCfg);

waitForClusterToForm();

org.infinispan.configuration.cache.ConfigurationBuilder cb = new org.infinispan.configuration.cache.ConfigurationBuilder();
cb.encoding().key().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
cb.encoding().value().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
TestingUtil.defineConfiguration(manager(0), SERVER_DEFINED_CACHE, cb.build());
}

@Override
protected org.infinispan.client.hotrod.configuration.ConfigurationBuilder createHotRodClientConfigurationBuilder(String host, int serverPort) {
org.infinispan.client.hotrod.configuration.ConfigurationBuilder cb = super.createHotRodClientConfigurationBuilder(host, serverPort);
if (marshaller != null) cb.marshaller(marshaller);
cb.nearCache().mode(NearCacheMode.INVALIDATED).maxEntries(100);
cb.remoteCache("").nearCacheMode(NearCacheMode.INVALIDATED)
.nearCacheMaxEntries(2)
.nearCacheUseBloomFilter(true);
.nearCacheUseBloomFilter(bloomFilter);
cb.connectionPool().maxActive(1);
return cb;
}

public void testServerDefinedCache() {
RemoteCacheManager cacheManager = client(0);
assertThat(cacheManager.getCache(SERVER_DEFINED_CACHE)).isNotNull();
assertThat(cacheManager.getCache()).isNotNull();
}

public void testRemoteWriteOnLocal() {
RemoteCacheManager cacheManager = client(0);
RemoteCacheManager cacheManager1 = client(1);
Expand All @@ -71,17 +90,20 @@ public void testRemoteWriteOnLocal() {

@Override
public Object[] factory() {
return new Object[] {
new ClusterNearCacheMarshallingTest(null, null),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, MediaType.APPLICATION_JBOSS_MARSHALLING),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, MediaType.APPLICATION_PROTOSTREAM),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, null),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, null),
};
return Stream.of(true, false)
.flatMap(bloomFilter -> Stream.of(
new ClusterNearCacheMarshallingTest(null, null, bloomFilter),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, MediaType.APPLICATION_JBOSS_MARSHALLING, bloomFilter),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, MediaType.APPLICATION_PROTOSTREAM, bloomFilter),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, null, bloomFilter),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, null, bloomFilter),
new ClusterNearCacheMarshallingTest(JavaSerializationMarshaller.class, null, bloomFilter)
))
.toArray();
}

@Override
protected String parameters() {
return String.format("(marshaller=%s, mediaType=%s", (marshaller != null ? marshaller.getSimpleName() : "null"), storeType);
return String.format("(marshaller=%s, mediaType=%s, bloomFilter=%b", (marshaller != null ? marshaller.getSimpleName() : "null"), storeType, bloomFilter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.test.SingleHotRodServerTest;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.JavaSerializationMarshaller;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.jboss.marshalling.commons.GenericJBossMarshaller;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "client.hotrod.near.NearCacheMarshallingTest")
public class NearCacheMarshallingTest extends SingleHotRodServerTest {

private static final String SERVER_DEFINED_CACHE = "other-cache";
private final Class<? extends Marshaller> marshaller;
private final MediaType storeType;
private final boolean useBloomFilter;
Expand All @@ -36,7 +39,12 @@ protected NearCacheMarshallingTest(Class<? extends Marshaller> marshaller, Media
protected EmbeddedCacheManager createCacheManager() throws Exception {
org.infinispan.configuration.cache.ConfigurationBuilder serverCfg = new org.infinispan.configuration.cache.ConfigurationBuilder();
if (storeType != null) hotRodCacheConfiguration(serverCfg, storeType);
return TestCacheManagerFactory.createCacheManager(contextInitializer(), serverCfg);
EmbeddedCacheManager ecm = TestCacheManagerFactory.createCacheManager(contextInitializer(), serverCfg);
org.infinispan.configuration.cache.ConfigurationBuilder cb = new org.infinispan.configuration.cache.ConfigurationBuilder();
cb.encoding().key().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
cb.encoding().value().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
TestingUtil.defineConfiguration(ecm, SERVER_DEFINED_CACHE, cb.build());
return ecm;
}

@Override
Expand All @@ -50,13 +58,23 @@ protected RemoteCacheManager getRemoteCacheManager() {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer().host("127.0.0.1").port(hotrodServer.getPort());
if (marshaller != null) builder.marshaller(marshaller);
builder.nearCache().mode(NearCacheMode.INVALIDATED).maxEntries(100);
builder.remoteCache("").nearCacheMode(NearCacheMode.INVALIDATED)
.nearCacheMaxEntries(2)
.nearCacheUseBloomFilter(useBloomFilter);
builder.connectionPool().maxActive(1);
return new RemoteCacheManager(builder.build());
}

public void testServerDefinedCache() {
RemoteCacheManager cacheManager = getRemoteCacheManager();

assertThat(cacheManager.getCache(SERVER_DEFINED_CACHE)).isNotNull();
assertThat(cacheManager.getCache()).isNotNull();

cacheManager.stop();
}

public void testRemoteWriteOnLocal() throws Exception {
RemoteCacheManager cacheManager = getRemoteCacheManager();
RemoteCacheManager cacheManager1 = getRemoteCacheManager();
Expand Down Expand Up @@ -90,7 +108,8 @@ protected static Object[] testInstances() {
new NearCacheMarshallingTest(GenericJBossMarshaller.class, MediaType.APPLICATION_JBOSS_MARSHALLING, useBloomFilter),
new NearCacheMarshallingTest(ProtoStreamMarshaller.class, MediaType.APPLICATION_PROTOSTREAM, useBloomFilter),
new NearCacheMarshallingTest(ProtoStreamMarshaller.class, null, useBloomFilter),
new NearCacheMarshallingTest(GenericJBossMarshaller.class, null, useBloomFilter)
new NearCacheMarshallingTest(GenericJBossMarshaller.class, null, useBloomFilter),
new NearCacheMarshallingTest(JavaSerializationMarshaller.class, null, useBloomFilter)
))
.toArray();
}
Expand Down

0 comments on commit dfa172e

Please sign in to comment.