Skip to content

Commit

Permalink
ISPN-15197 Near Cache With Bloom Filter enabled doesn't work properly…
Browse files Browse the repository at this point in the history
… if jboss-marshalling is used
  • Loading branch information
jabolina committed Sep 29, 2023
1 parent 874a7c7 commit f95a49d
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 1 deletion.
6 changes: 6 additions & 0 deletions client/hotrod-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@
<artifactId>bcpkix-jdk15to18</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-jboss-marshalling</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ CompletableFuture<V> invalidateNearCacheIfNeeded(boolean hasForceReturnValue, Ob
@Override
public void start() {
super.start();
}

@Override
public void resolveStorage(boolean objectStorage) {
super.resolveStorage(objectStorage);

// Only register the listener *after* resolving the data format to use.
// This is necessary for the listener to correctly marshall the data.
listenerAddress = nearcache.start(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.infinispan.client.hotrod.near;

import static org.assertj.core.api.Assertions.assertThat;
import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;

import java.util.stream.Stream;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.test.SingleHotRodServerTest;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.ProtoStreamMarshaller;
import org.infinispan.jboss.marshalling.commons.GenericJBossMarshaller;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "client.hotrod.near.NearCacheMarshallingTest")
public class NearCacheMarshallingTest extends SingleHotRodServerTest {

private final Class<? extends Marshaller> marshaller;
private final MediaType storeType;
private final boolean useBloomFilter;

protected NearCacheMarshallingTest(Class<? extends Marshaller> marshaller, MediaType storeType, boolean useBloomFilter) {
this.marshaller = marshaller;
this.storeType = storeType;
this.useBloomFilter = useBloomFilter;
}

@Override
protected EmbeddedCacheManager createCacheManager() throws Exception {
org.infinispan.configuration.cache.ConfigurationBuilder serverCfg = new org.infinispan.configuration.cache.ConfigurationBuilder();
if (storeType != null) hotRodCacheConfiguration(serverCfg, storeType);
return TestCacheManagerFactory.createCacheManager(contextInitializer(), serverCfg);
}

@Override
protected void setup() throws Exception {
cacheManager = createCacheManager();
hotrodServer = createHotRodServer();
}

@Override
protected RemoteCacheManager getRemoteCacheManager() {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer().host("127.0.0.1").port(hotrodServer.getPort());
if (marshaller != null) builder.marshaller(marshaller);
builder.remoteCache("").nearCacheMode(NearCacheMode.INVALIDATED)
.nearCacheMaxEntries(2)
.nearCacheUseBloomFilter(useBloomFilter);
builder.connectionPool().maxActive(1);
return new RemoteCacheManager(builder.build());
}

public void testRemoteWriteOnLocal() throws Exception {
RemoteCacheManager cacheManager = getRemoteCacheManager();
RemoteCacheManager cacheManager1 = getRemoteCacheManager();

RemoteCache<String, String> cache = cacheManager.getCache();
cache.put("K", "V");
assertThat(cache.get("K")).isEqualTo("V");

RemoteCache<String, String> cache1 = cacheManager1.getCache();
assertThat(cache1.get("K")).isEqualTo("V");

// Another client updates the value.
cache1.replace("K", "V1");

// Take effect immediately.
assertThat(cache1.get("K")).isEqualTo("V1");

// The other cache eventually updates to reflect the replace.
eventually(() -> cache.get("K").equals("V1"));

cacheManager.stop();
cacheManager1.stop();
}

@Factory
protected static Object[] testInstances() {
return Stream.of(true, false)
.flatMap(useBloomFilter ->
Stream.of(
new NearCacheMarshallingTest(null, null, useBloomFilter), // Let default.
new NearCacheMarshallingTest(GenericJBossMarshaller.class, MediaType.APPLICATION_JBOSS_MARSHALLING, useBloomFilter),
new NearCacheMarshallingTest(ProtoStreamMarshaller.class, MediaType.APPLICATION_PROTOSTREAM, useBloomFilter),
new NearCacheMarshallingTest(ProtoStreamMarshaller.class, null, useBloomFilter),
new NearCacheMarshallingTest(GenericJBossMarshaller.class, null, useBloomFilter)
))
.toArray();
}

@Override
protected String parameters() {
return String.format("(marshaller=%s, mediaType=%s, bloomFilter=%b", (marshaller != null ? marshaller.getSimpleName() : "null"), storeType, useBloomFilter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Collections;
import java.util.Set;

import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.Ids;
import org.infinispan.factories.ComponentRegistry;
Expand All @@ -25,9 +26,16 @@
@Scope(Scopes.NONE)
public class KeyValueFilterConverterAsCacheEventFilterConverter<K, V, C> implements CacheEventFilterConverter<K, V, C> {
private final KeyValueFilterConverter<K, V, C> keyValueFilterConverter;
private final MediaType format;

public KeyValueFilterConverterAsCacheEventFilterConverter(KeyValueFilterConverter<K, V, C> keyValueFilterConverter) {
this(keyValueFilterConverter, MediaType.APPLICATION_OBJECT);
}

public KeyValueFilterConverterAsCacheEventFilterConverter(KeyValueFilterConverter<K, V, C> keyValueFilterConverter, MediaType format) {
this.keyValueFilterConverter = keyValueFilterConverter;
// If the format is unknown, defaults to use the storage type.
this.format = format == MediaType.APPLICATION_UNKNOWN ? null : format;
}

@Override
Expand All @@ -45,6 +53,11 @@ public boolean accept(K key, V oldValue, Metadata oldMetadata, V newValue, Metad
return keyValueFilterConverter.accept(key, newValue, newMetadata);
}

@Override
public MediaType format() {
return format;
}

@Inject
protected void injectDependencies(ComponentRegistry cr) {
cr.wireDependencies(keyValueFilterConverter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ CompletionStage<Void> addClientListener(Channel ch, HotRodHeader h, byte[] liste
assert !includeState;
eventType = ClientEventType.createType(false, useRawData, h.version);
filter = null;
converter = new KeyValueFilterConverterAsCacheEventFilterConverter<>(HotRodServer.ToEmptyBytesKeyValueFilterConverter.INSTANCE);
converter = new KeyValueFilterConverterAsCacheEventFilterConverter<>(HotRodServer.ToEmptyBytesKeyValueFilterConverter.INSTANCE, cache.getKeyDataConversion().getRequestMediaType());
} else {
boolean hasFilter = filterFactory != null && !filterFactory.isEmpty();
boolean hasConverter = converterFactory != null && !converterFactory.isEmpty();
Expand Down

0 comments on commit f95a49d

Please sign in to comment.