Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-6550 Remote iterator does not work in compatibility mode #4271

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
public interface KeyTracker {

boolean track(byte[] key);
boolean track(byte[] key, short status);

void segmentsFinished(byte[] finishedSegments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.commons.marshall.Marshaller;

import java.util.Set;

Expand All @@ -14,10 +15,10 @@ final class KeyTrackerFactory {
private KeyTrackerFactory() {
}

public static KeyTracker create(ConsistentHash hash, int topologyId, Set<Integer> segments) {
public static KeyTracker create(Marshaller marshaller, ConsistentHash hash, int topologyId, Set<Integer> segments) {
if (topologyId == -1) return new NoOpSegmentKeyTracker();
if (hash == null) return new ReplKeyTracker();
return new SegmentKeyTracker((SegmentConsistentHash) hash, segments);
return new SegmentKeyTracker(marshaller, (SegmentConsistentHash) hash, segments);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class NoOpSegmentKeyTracker implements KeyTracker {

@Override
public boolean track(byte[] key) {
public boolean track(byte[] key, short status) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.CloseableIterator;

import java.util.LinkedList;
Expand Down Expand Up @@ -39,8 +40,8 @@ public class RemoteCloseableIterator<E> implements CloseableIterator<Entry<Objec
private KeyTracker segmentKeyTracker;
private Transport transport;
private String iterationId;
boolean endOfIteration = false;
boolean closed;
private boolean endOfIteration = false;
private boolean closed;
private Queue<Entry<Object, E>> nextElements = new LinkedList<>();

public RemoteCloseableIterator(OperationsFactory operationsFactory, String filterConverterFactory,
Expand Down Expand Up @@ -131,6 +132,8 @@ private IterationStartResponse startInternal(Set<Integer> segments) {

public void start() {
IterationStartResponse startResponse = startInternal(segments);
this.segmentKeyTracker = KeyTrackerFactory.create(startResponse.getSegmentConsistentHash(), startResponse.getTopologyId(), segments);
Marshaller marshaller = startResponse.getTransport().getTransportFactory().getMarshaller();
this.segmentKeyTracker = KeyTrackerFactory.create(
marshaller, startResponse.getSegmentConsistentHash(), startResponse.getTopologyId(), segments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ReplKeyTracker implements KeyTracker {
private Set<byte[]> keys = CollectionFactory.makeSet(ByteArrayEquivalence.INSTANCE);

@Override
public boolean track(byte[] key) {
public boolean track(byte[] key, short status) {
return keys.add(key);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.infinispan.client.hotrod.impl.iteration;

import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.marshall.MarshallerUtil;
import org.infinispan.commons.equivalence.ByteArrayEquivalence;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.Util;

Expand All @@ -23,8 +26,10 @@ class SegmentKeyTracker implements KeyTracker {

private final AtomicReferenceArray<Set<byte[]>> keysPerSegment;
private final SegmentConsistentHash segmentConsistentHash;
private final Marshaller marshaller;

public SegmentKeyTracker(SegmentConsistentHash segmentConsistentHash, Set<Integer> segments) {
public SegmentKeyTracker(Marshaller marshaller, SegmentConsistentHash segmentConsistentHash, Set<Integer> segments) {
this.marshaller = marshaller;
int numSegments = segmentConsistentHash.getNumSegments();
keysPerSegment = new AtomicReferenceArray<>(numSegments);
if (log.isDebugEnabled()) log.debugf("Created SegmentKeyTracker with %d segments", numSegments);
Expand All @@ -34,8 +39,10 @@ public SegmentKeyTracker(SegmentConsistentHash segmentConsistentHash, Set<Intege
segmentStream.forEach(i -> keysPerSegment.set(i, CollectionFactory.makeSet(ByteArrayEquivalence.INSTANCE)));
}

public boolean track(byte[] key) {
int segment = segmentConsistentHash.getSegment(key);
public boolean track(byte[] key, short status) {
int segment = HotRodConstants.hasCompatibility(status) ?
segmentConsistentHash.getSegment(MarshallerUtil.bytes2obj(marshaller, key, status)) :
segmentConsistentHash.getSegment(key);
boolean result = keysPerSegment.get(segment).add(key);
if (log.isTraceEnabled())
log.trackingSegmentKey(Util.printArray(key), segment, !result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public IterationNextResponse<E> execute() {
value = new MetadataValueImpl<>(creation, lifespan, lastUsed, maxIdle, version, value);
}

if (segmentKeyTracker.track(key)) {
if (segmentKeyTracker.track(key, status)) {
entries.add(new SimpleEntry<>(unmarshall(key, status), (E) value));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HeaderParams;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
import org.infinispan.client.hotrod.logging.Log;
Expand All @@ -31,9 +32,9 @@ public class IterationStartOperation extends RetryOnFailureOperation<IterationSt
private final TransportFactory transportFactory;
private final boolean metadata;

protected IterationStartOperation(Codec codec, int flags, byte[] cacheName, AtomicInteger topologyId,
String filterConverterFactory, byte[][] filterParameters, Set<Integer> segments,
int batchSize, TransportFactory transportFactory, boolean metadata) {
IterationStartOperation(Codec codec, int flags, byte[] cacheName, AtomicInteger topologyId,
String filterConverterFactory, byte[][] filterParameters, Set<Integer> segments,
int batchSize, TransportFactory transportFactory, boolean metadata) {
super(codec, transportFactory, cacheName, topologyId, flags);
this.filterConverterFactory = filterConverterFactory;
this.filterParameters = filterParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ public class IterationStartResponse {
private final int topologyId;
private final Transport transport;

public IterationStartResponse(String iterationId, SegmentConsistentHash segmentConsistentHash, int topologyId, Transport transport) {
IterationStartResponse(String iterationId, SegmentConsistentHash segmentConsistentHash, int topologyId, Transport transport) {
this.iterationId = iterationId;
this.segmentConsistentHash = segmentConsistentHash;
this.topologyId = topologyId;
this.transport = transport;

}

public String getIterationId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.infinispan.client.hotrod.impl.iteration;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.equivalence.AnyEquivalence;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.Set;

import static org.infinispan.server.hotrod.test.HotRodTestingUtil.hotRodCacheConfiguration;
import static org.testng.AssertJUnit.assertEquals;

@Test(groups = "functional", testName = "client.hotrod.iteration.DistFailOverRemoteIteratorTest")
public class MultiServerCompatTest extends MultiHotRodServersTest implements AbstractRemoteIteratorTest {

private static final int NUM_SERVERS = 2;
private static final int CACHE_SIZE = 10;

@Override
protected void createCacheManagers() throws Throwable {
createHotRodServers(NUM_SERVERS, getCacheConfiguration());
}

private org.infinispan.configuration.cache.ConfigurationBuilder getCacheConfiguration() {
ConfigurationBuilder builder = hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
builder.clustering().hash().numSegments(60).numOwners(1);
builder.compatibility().enable();
builder.dataContainer().keyEquivalence(AnyEquivalence.getInstance());
return builder;
}

@Test
public void testIteration() throws Exception {
RemoteCache<Integer, String> remoteCache = clients.get(0).getCache();
populateCache(CACHE_SIZE, i -> "value", remoteCache);
Set<Map.Entry<Object, Object>> entries = extractEntries(remoteCache.retrieveEntries(null, 5));

assertEquals(CACHE_SIZE, entries.size());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -41,12 +40,12 @@ private ConfigurationBuilder getCacheConfiguration() {
}


class TestSegmentKeyTracker implements KeyTracker {
private static class TestSegmentKeyTracker implements KeyTracker {

List<Integer> finished = new ArrayList<>();

@Override
public boolean track(byte[] key) {
public boolean track(byte[] key, short status) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class DefaultIterationManager(val cacheManager: EmbeddedCacheManager) extends It
})

val segmentListener = new IterationSegmentsListener
val compatInfo = CompatInfo(cacheManager.getCacheConfiguration(cacheName).compatibility())
val compatInfo = CompatInfo(cacheManager.getCache(cacheName).getCacheConfiguration.compatibility())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason cacheManager.getCacheConfiguration(cacheName).compatibility() returns the wrong cache configuration (from the default one) instead of the cacheName's cache configuration.
This needs more investigation (another JIRA)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for another JIRA

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


val filteredStream = for {
(name, params) <- namedFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.infinispan.server.test.client.hotrod;

import org.infinispan.arquillian.core.InfinispanResource;
import org.infinispan.arquillian.core.RemoteInfinispanServer;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.server.test.category.HotRodClustered;
import org.infinispan.server.test.category.HotRodSingleNode;
import org.infinispan.server.test.util.RemoteCacheManagerFactory;
import org.jboss.arquillian.junit.Arquillian;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;

@RunWith(Arquillian.class)
@Category({HotRodSingleNode.class, HotRodClustered.class})
public class HotRodRemoteCacheCompatIT {

private static final String CACHE_NAME = "compatibilityCache";
private static final int CACHE_SIZE = 1000;
private static RemoteCacheManager remoteCacheManager;

private RemoteCache<Integer, String> remoteCache;

@InfinispanResource("container1")
RemoteInfinispanServer server1;

@Before
public void setup() throws IOException {
RemoteCacheManagerFactory remoteCacheManagerFactory = new RemoteCacheManagerFactory();
ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
.host(server1.getHotrodEndpoint().getInetAddress().getHostName())
.port(server1.getHotrodEndpoint().getPort());
remoteCacheManager = remoteCacheManagerFactory.createManager(clientBuilder);
remoteCache = remoteCacheManager.getCache(CACHE_NAME);
}

@AfterClass
public static void release() {
if (remoteCacheManager != null) {
remoteCacheManager.stop();
}
}

@Test
public void testIteration() {
remoteCache.clear();
IntStream.range(0, CACHE_SIZE).forEach(k -> remoteCache.put(k, "value" + k));
Set<Object> keys = new HashSet<>();
try (CloseableIterator<Entry<Object, Object>> iter = remoteCache.retrieveEntries(null, 10)) {
iter.forEachRemaining(e -> keys.add(e.getKey()));
}
assertEquals(CACHE_SIZE, keys.size());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@
enabled="true"
timeout="600000" />
</distributed-cache>
<distributed-cache-configuration
name="compatibilityCacheConfiguration"
start="EAGER"
mode="SYNC"
segments="1"
owners="2"
batching="false"
l1-lifespan="0"
remote-timeout="60000" >
<transaction mode="NONE" />
<state-transfer
enabled="true"
timeout="600000" />
<compatibility enabled="true"/>
</distributed-cache-configuration>
<distributed-cache name="compatibilityCache" configuration="compatibilityCacheConfiguration"/>
<!-- to get a memcached module to work, there needs to be a named cache called "memcachedCache" -->
<distributed-cache
name="memcachedCache"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
>
<transaction mode="NONE" />
</local-cache>
<local-cache-configuration
name="compatibilityCacheConfiguration"
start="EAGER">
<transaction mode="NONE"/>
<compatibility enabled="true"/>
</local-cache-configuration>
<local-cache name="compatibilityCache" configuration="compatibilityCacheConfiguration"/>
<!-- to get a memcached module to work, there needs to be a named cache called "memcachedCache" -->
<local-cache
name="memcachedCache"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@
enabled="true"
timeout="60000" />
</replicated-cache>
<replicated-cache-configuration
name="compatibilityCacheConfiguration"
start="EAGER"
mode="SYNC"
batching="false"
remote-timeout="60000">
<transaction mode="NONE"/>
<state-transfer
enabled="true"
timeout="60000"/>
<compatibility enabled="true"/>
</replicated-cache-configuration>
<replicated-cache name="compatibilityCache" configuration="compatibilityCacheConfiguration"/>
<!-- to get a memcached module to work, there needs to be a named cache called "memcachedCache" -->
<replicated-cache
name="memcachedCache"
Expand Down