Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-15494 Near cache with invalidation fails to start in Spring Boot #11727

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.near.NearCacheService;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.Marshaller;

/**
* Near {@link org.infinispan.client.hotrod.RemoteCache} implementation enabling
Expand Down Expand Up @@ -192,17 +194,19 @@ CompletableFuture<V> invalidateNearCacheIfNeeded(boolean hasForceReturnValue, Ob
}

@Override
public void start() {
super.start();
}

@Override
public void resolveStorage(boolean objectStorage) {
super.resolveStorage(objectStorage);

// Only register the listener *after* resolving the data format to use.
// This is necessary for the listener to correctly marshall the data.
listenerAddress = nearcache.start(this);
public void resolveStorage(MediaType key, MediaType value, boolean objectStorage) {
if (key != null && !delegate.getRemoteCacheManager().getMarshaller().mediaType().match(key)) {
// The server has a storage type which the cache marshaller does not handle.
// This could lead to losing events in the bloom filter, where the client and server see the key differently.
// To avoid this issue, the client negotiate the type when installing the listener, but it needs a proper configuration.
Class<? extends Marshaller> marshallerClass = delegate.getRemoteCacheManager().getMarshaller().getClass();
log.invalidateNearDefaultMarshallerMismatch(delegate.getName(), marshallerClass, key);
listenerAddress = nearcache.start(this);
delegate.resolveStorage(key, value, objectStorage);
} else {
delegate.resolveStorage(key, value, objectStorage);
listenerAddress = nearcache.start(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,8 @@ public interface Log extends BasicLogger {
@LogMessage(level = WARN)
@Message(value = "Client cannot marshall the server's key media type ('%s'). This could cause poor performance.", id = 4116)
void serverKeyTypeNotRecognized(MediaType serverKeyType);

@LogMessage(level = WARN)
@Message(value = "Cache '%s' with marshaller %s does not handle server storage type '%s'. Configure the cache or default marshaller.", id = 4117)
void invalidateNearDefaultMarshallerMismatch(String cacheName, Class<?> clazz, MediaType serverKeyType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,36 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;

import java.util.stream.Stream;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.JavaSerializationMarshaller;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.jboss.marshalling.commons.GenericJBossMarshaller;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "client.hotrod.event.ClusteredListenerMarshallerTest")
public class ClusterNearCacheMarshallingTest extends MultiHotRodServersTest {

private static final String SERVER_DEFINED_CACHE = "other-cache";
private Class<? extends Marshaller> marshaller;
private MediaType storeType;
private boolean bloomFilter;

public ClusterNearCacheMarshallingTest() { }

protected ClusterNearCacheMarshallingTest(Class<? extends Marshaller> marshaller, MediaType storeType) {
protected ClusterNearCacheMarshallingTest(Class<? extends Marshaller> marshaller, MediaType storeType, boolean bloomFilter) {
this.marshaller = marshaller;
this.storeType = storeType;
this.bloomFilter = bloomFilter;
}

@Override
Expand All @@ -35,19 +42,31 @@ protected void createCacheManagers() throws Throwable {
createHotRodServers(2, serverCfg);

waitForClusterToForm();

org.infinispan.configuration.cache.ConfigurationBuilder cb = new org.infinispan.configuration.cache.ConfigurationBuilder();
cb.encoding().key().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
cb.encoding().value().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
TestingUtil.defineConfiguration(manager(0), SERVER_DEFINED_CACHE, cb.build());
}

@Override
protected org.infinispan.client.hotrod.configuration.ConfigurationBuilder createHotRodClientConfigurationBuilder(String host, int serverPort) {
org.infinispan.client.hotrod.configuration.ConfigurationBuilder cb = super.createHotRodClientConfigurationBuilder(host, serverPort);
if (marshaller != null) cb.marshaller(marshaller);
cb.nearCache().mode(NearCacheMode.INVALIDATED).maxEntries(100);
cb.remoteCache("").nearCacheMode(NearCacheMode.INVALIDATED)
.nearCacheMaxEntries(2)
.nearCacheUseBloomFilter(true);
.nearCacheUseBloomFilter(bloomFilter);
cb.connectionPool().maxActive(1);
return cb;
}

public void testServerDefinedCache() {
RemoteCacheManager cacheManager = client(0);
assertThat(cacheManager.getCache(SERVER_DEFINED_CACHE)).isNotNull();
assertThat(cacheManager.getCache()).isNotNull();
}

public void testRemoteWriteOnLocal() {
RemoteCacheManager cacheManager = client(0);
RemoteCacheManager cacheManager1 = client(1);
Expand All @@ -71,17 +90,20 @@ public void testRemoteWriteOnLocal() {

@Override
public Object[] factory() {
return new Object[] {
new ClusterNearCacheMarshallingTest(null, null),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, MediaType.APPLICATION_JBOSS_MARSHALLING),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, MediaType.APPLICATION_PROTOSTREAM),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, null),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, null),
};
return Stream.of(true, false)
.flatMap(bloomFilter -> Stream.of(
new ClusterNearCacheMarshallingTest(null, null, bloomFilter),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, MediaType.APPLICATION_JBOSS_MARSHALLING, bloomFilter),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, MediaType.APPLICATION_PROTOSTREAM, bloomFilter),
new ClusterNearCacheMarshallingTest(ProtoStreamMarshaller.class, null, bloomFilter),
new ClusterNearCacheMarshallingTest(GenericJBossMarshaller.class, null, bloomFilter),
new ClusterNearCacheMarshallingTest(JavaSerializationMarshaller.class, null, bloomFilter)
))
.toArray();
}

@Override
protected String parameters() {
return String.format("(marshaller=%s, mediaType=%s", (marshaller != null ? marshaller.getSimpleName() : "null"), storeType);
return String.format("(marshaller=%s, mediaType=%s, bloomFilter=%b", (marshaller != null ? marshaller.getSimpleName() : "null"), storeType, bloomFilter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.test.SingleHotRodServerTest;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.JavaSerializationMarshaller;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.jboss.marshalling.commons.GenericJBossMarshaller;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "client.hotrod.near.NearCacheMarshallingTest")
public class NearCacheMarshallingTest extends SingleHotRodServerTest {

private static final String SERVER_DEFINED_CACHE = "other-cache";
private final Class<? extends Marshaller> marshaller;
private final MediaType storeType;
private final boolean useBloomFilter;
Expand All @@ -36,7 +39,12 @@ protected NearCacheMarshallingTest(Class<? extends Marshaller> marshaller, Media
protected EmbeddedCacheManager createCacheManager() throws Exception {
org.infinispan.configuration.cache.ConfigurationBuilder serverCfg = new org.infinispan.configuration.cache.ConfigurationBuilder();
if (storeType != null) hotRodCacheConfiguration(serverCfg, storeType);
return TestCacheManagerFactory.createCacheManager(contextInitializer(), serverCfg);
EmbeddedCacheManager ecm = TestCacheManagerFactory.createCacheManager(contextInitializer(), serverCfg);
org.infinispan.configuration.cache.ConfigurationBuilder cb = new org.infinispan.configuration.cache.ConfigurationBuilder();
cb.encoding().key().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
cb.encoding().value().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE);
TestingUtil.defineConfiguration(ecm, SERVER_DEFINED_CACHE, cb.build());
return ecm;
}

@Override
Expand All @@ -50,13 +58,23 @@ protected RemoteCacheManager getRemoteCacheManager() {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer().host("127.0.0.1").port(hotrodServer.getPort());
if (marshaller != null) builder.marshaller(marshaller);
builder.nearCache().mode(NearCacheMode.INVALIDATED).maxEntries(100);
builder.remoteCache("").nearCacheMode(NearCacheMode.INVALIDATED)
.nearCacheMaxEntries(2)
.nearCacheUseBloomFilter(useBloomFilter);
builder.connectionPool().maxActive(1);
return new RemoteCacheManager(builder.build());
}

public void testServerDefinedCache() {
RemoteCacheManager cacheManager = getRemoteCacheManager();

assertThat(cacheManager.getCache(SERVER_DEFINED_CACHE)).isNotNull();
assertThat(cacheManager.getCache()).isNotNull();

cacheManager.stop();
}

public void testRemoteWriteOnLocal() throws Exception {
RemoteCacheManager cacheManager = getRemoteCacheManager();
RemoteCacheManager cacheManager1 = getRemoteCacheManager();
Expand Down Expand Up @@ -90,7 +108,8 @@ protected static Object[] testInstances() {
new NearCacheMarshallingTest(GenericJBossMarshaller.class, MediaType.APPLICATION_JBOSS_MARSHALLING, useBloomFilter),
new NearCacheMarshallingTest(ProtoStreamMarshaller.class, MediaType.APPLICATION_PROTOSTREAM, useBloomFilter),
new NearCacheMarshallingTest(ProtoStreamMarshaller.class, null, useBloomFilter),
new NearCacheMarshallingTest(GenericJBossMarshaller.class, null, useBloomFilter)
new NearCacheMarshallingTest(GenericJBossMarshaller.class, null, useBloomFilter),
new NearCacheMarshallingTest(JavaSerializationMarshaller.class, null, useBloomFilter)
))
.toArray();
}
Expand Down