Skip to content

Commit

Permalink
ISPN-5219 Expose Distributed Iterators over HotRod
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Fernandes authored and wburns committed Jun 16, 2015
1 parent e3b0d87 commit 4cbbc3a
Show file tree
Hide file tree
Showing 87 changed files with 2,500 additions and 43 deletions.
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.concurrent.NotifyingFuture;

/**
Expand Down Expand Up @@ -155,6 +156,20 @@ public interface RemoteCache<K, V> extends BasicCache<K, V> {
*/
NotifyingFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds);

/**
* Retrieve entries from the server
*
* @param filterConverterFactory Factory name for the KeyValueFilterConverter or null
* @param segments The segments to iterate. If empty or null all segments will be iterated
* @param batchSize The number of entries transferred from the server at a time
* @return Iterator for the entries
*/
CloseableIterator<Entry<Object, Object>> retrieveEntries(String filterConverterFactory, Set<Integer> segments, int batchSize);

/**
* @see #retrieveEntries(String, java.util.Set, int)
*/
CloseableIterator<Entry<Object, Object>> retrieveEntries(String filterConverterFactory, int batchSize);

/**
* Returns the {@link VersionedValue} associated to the supplied key param, or null if it doesn't exist.
Expand Down
Expand Up @@ -51,14 +51,15 @@ public class ConfigurationProperties {
public static final int DEFAULT_SO_TIMEOUT = 60000;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_MAX_RETRIES = 10;
public static final String PROTOCOL_VERSION_23 = "2.3";
public static final String PROTOCOL_VERSION_22 = "2.2";
public static final String PROTOCOL_VERSION_21 = "2.1";
public static final String PROTOCOL_VERSION_20 = "2.0";
public static final String PROTOCOL_VERSION_13 = "1.3";
public static final String PROTOCOL_VERSION_12 = "1.2";
public static final String PROTOCOL_VERSION_11 = "1.1";
public static final String PROTOCOL_VERSION_10 = "1.0";
public static final String DEFAULT_PROTOCOL_VERSION = PROTOCOL_VERSION_22;
public static final String DEFAULT_PROTOCOL_VERSION = PROTOCOL_VERSION_23;

private final TypedProperties props;

Expand Down
Expand Up @@ -24,10 +24,12 @@
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.exceptions.RemoteCacheManagerNotStartedException;
import org.infinispan.client.hotrod.impl.operations.*;
import org.infinispan.client.hotrod.impl.iteration.RemoteCloseableIterator;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.client.hotrod.marshall.MarshallerUtil;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.concurrent.NotifyingFuture;
import org.infinispan.commons.util.concurrent.NotifyingFutureImpl;

Expand Down Expand Up @@ -151,6 +153,19 @@ public Boolean call() throws Exception {
return result;
}

@Override
public CloseableIterator<Entry<Object, Object>> retrieveEntries(String filterConverterFactory, Set<Integer> segments, int batchSize) {
assertRemoteCacheManagerIsStarted();
RemoteCloseableIterator remoteCloseableIterator = new RemoteCloseableIterator(operationsFactory, filterConverterFactory, segments, batchSize, marshaller);
remoteCloseableIterator.start();
return remoteCloseableIterator;
}

@Override
public CloseableIterator<Entry<Object, Object>> retrieveEntries(String filterConverterFactory, int batchSize) {
return retrieveEntries(filterConverterFactory, null, batchSize);
}

@Override
public VersionedValue<V> getVersioned(K key) {
assertRemoteCacheManagerIsStarted();
Expand Down
@@ -1,15 +1,15 @@
package org.infinispan.client.hotrod.impl.consistenthash;

import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;

import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.hash.Hash;
import org.infinispan.commons.hash.MurmurHash3;
import org.infinispan.commons.util.Util;

import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;

/**
* @author Galder Zamarreño
*/
Expand All @@ -19,6 +19,7 @@ public final class SegmentConsistentHash implements ConsistentHash {

private final Hash hash = MurmurHash3.getInstance();
private SocketAddress[][] segmentOwners;
private int numSegments;
private int segmentSize;

@Override
Expand All @@ -28,6 +29,7 @@ public void init(Map<SocketAddress, Set<Integer>> servers2Hash, int numKeyOwners

public void init(SocketAddress[][] segmentOwners, int numSegments) {
this.segmentOwners = segmentOwners;
this.numSegments = numSegments;
this.segmentSize = Util.getSegmentSize(numSegments);
}

Expand All @@ -40,7 +42,7 @@ public SocketAddress getServer(byte[] key) {
return segmentOwners[segmentId][0];
}

private int getSegment(Object key) {
public int getSegment(Object key) {
// The result must always be positive, so we make sure the dividend is positive first
return getNormalizedHash(key) / segmentSize;
}
Expand All @@ -50,4 +52,11 @@ public int getNormalizedHash(Object object) {
return Util.getNormalizedHash(object, hash);
}

public int getNumSegments() {
return numSegments;
}

public SocketAddress[][] getSegmentOwners() {
return segmentOwners;
}
}
@@ -0,0 +1,16 @@
package org.infinispan.client.hotrod.impl.iteration;

import java.util.Set;

/**
* @author gustavonalle
* @since 8.0
*/
public interface KeyTracker {

boolean track(byte[] key);

void segmentsFinished(byte[] finishedSegments);

Set<Integer> missedSegments();
}
@@ -0,0 +1,21 @@
package org.infinispan.client.hotrod.impl.iteration;

import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;

/**
* @author gustavonalle
* @since 8.0
*/
public final class KeyTrackerFactory {

private KeyTrackerFactory() {
}

public static KeyTracker create(ConsistentHash hash, int topologyId) {
if (topologyId == -1) return new NoOpSegmentKeyTracker();
if (hash == null) return new ReplKeyTracker();
return new SegmentKeyTracker((SegmentConsistentHash) hash);
}

}
@@ -0,0 +1,26 @@
package org.infinispan.client.hotrod.impl.iteration;

import java.util.Collections;
import java.util.Set;

/**
* @author gustavonalle
* @since 8.0
*/
public class NoOpSegmentKeyTracker implements KeyTracker {

@Override
public boolean track(byte[] key) {
return true;
}

@Override
public void segmentsFinished(byte[] finishedSegments) {
}

@Override
public Set<Integer> missedSegments() {
return Collections.emptySet();
}

}
@@ -0,0 +1,150 @@
package org.infinispan.client.hotrod.impl.iteration;

import net.jcip.annotations.NotThreadSafe;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.operations.IterationEndResponse;
import org.infinispan.client.hotrod.impl.operations.IterationNextOperation;
import org.infinispan.client.hotrod.impl.operations.IterationNextResponse;
import org.infinispan.client.hotrod.impl.operations.IterationStartOperation;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.util.CloseableIterator;

import java.util.AbstractMap.SimpleEntry;
import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;

import static org.infinispan.client.hotrod.impl.protocol.HotRodConstants.INVALID_ITERATION;
import static org.infinispan.client.hotrod.impl.protocol.HotRodConstants.NO_ERROR_STATUS;
import static org.infinispan.client.hotrod.marshall.MarshallerUtil.bytes2obj;

/**
* @author gustavonalle
* @since 8.0
*/
@NotThreadSafe
public class RemoteCloseableIterator implements CloseableIterator<Entry<Object, Object>> {

private static final Log log = LogFactory.getLog(RemoteCloseableIterator.class);

private final OperationsFactory operationsFactory;
private final Marshaller marshaller;
private final String filterConverterFactory;
private final Set<Integer> segments;
private final int batchSize;

private KeyTracker segmentKeyTracker;
private Transport transport;
private String iterationId;
boolean endOfIteration = false;
private Queue<SimpleEntry<Object, Object>> nextElements = new LinkedList<>();

public RemoteCloseableIterator(OperationsFactory operationsFactory, String filterConverterFactory, Set<Integer> segments, int batchSize, Marshaller marshaller) {
this.filterConverterFactory = filterConverterFactory;
this.segments = segments;
this.batchSize = batchSize;
this.operationsFactory = operationsFactory;
this.marshaller = marshaller;
}

@Override
public void close() {
IterationEndResponse endResponse = operationsFactory.newIterationEndOperation(iterationId, transport).execute();
short status = endResponse.getStatus();

if (status == NO_ERROR_STATUS) {
log.iterationClosed(iterationId);
}
if (endResponse.getStatus() == INVALID_ITERATION) {
throw log.errorClosingIteration(iterationId);
}
}

@Override
public boolean hasNext() {
if (nextElements.isEmpty()) {
fetch();
}
return !endOfIteration;
}

@Override
public Entry<Object, Object> next() {
if (!hasNext()) throw new NoSuchElementException();
return nextElements.remove();
}

private void fetch() {
try {
IterationNextOperation iterationNextOperation = operationsFactory.newIterationNextOperation(iterationId, transport);

while (nextElements.isEmpty() && !endOfIteration) {
IterationNextResponse iterationNextResponse = iterationNextOperation.execute();
short status = iterationNextResponse.getStatus();
if (status == INVALID_ITERATION) {
throw log.errorRetrievingNext(iterationId);
}
Entry<byte[], byte[]>[] entries = iterationNextResponse.getEntries();

if (entries.length == 0) {
endOfIteration = true;
break;
}
for (Entry<byte[], byte[]> entry : entries) {
if (segmentKeyTracker.track(entry.getKey())) {
nextElements.add(new SimpleEntry<>(unmarshall(entry.getKey()), unmarshall(entry.getValue())));
}
}
segmentKeyTracker.segmentsFinished(iterationNextResponse.getFinishedSegments());
}

} catch (TransportException e) {
log.warnf(e, "Error reaching the server during iteration");
restartIteration(segmentKeyTracker.missedSegments());
fetch();
}
}


private Object unmarshall(byte[] bytes) {
return bytes2obj(marshaller, bytes);
}

private void restartIteration(Set<Integer> missedSegments) {
startInternal(missedSegments);
}

private void start(Set<Integer> fromSegments) {
IterationStartResponse startResponse = startInternal(fromSegments);

this.segmentKeyTracker = KeyTrackerFactory.create(startResponse.getSegmentConsistentHash(), startResponse.getTopologyId());
}

private IterationStartResponse startInternal(Set<Integer> fromSegments) {
if (log.isDebugEnabled()) {
log.debugf("Staring iteration with segments %s", fromSegments);
}
IterationStartOperation iterationStartOperation = operationsFactory.newIterationStartOperation(filterConverterFactory, fromSegments, batchSize);
IterationStartResponse startResponse = iterationStartOperation.execute();
this.transport = startResponse.getTransport();
if (log.isDebugEnabled()) {
log.debugf("Obtained transport", this.transport);
}
this.iterationId = startResponse.getIterationId();
if (log.isDebugEnabled()) {
log.debugf("IterationId:", this.iterationId);
}
return startResponse;
}

public void start() {
start(segments);
}
}
@@ -0,0 +1,37 @@
package org.infinispan.client.hotrod.impl.iteration;


import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static java.nio.ByteBuffer.wrap;
import org.infinispan.commons.equivalence.ByteArrayEquivalence;
import org.infinispan.commons.util.CollectionFactory;

/**
* Tracks all keys seen during iteration. Depends on ISPN-5451 to be done more efficiently, by
* discarding segments as soon as they are completed iterating.
*
* @author gustavonalle
* @since 8.0
*/
public class ReplKeyTracker implements KeyTracker {

private Set<byte[]> keys = CollectionFactory.makeSet(ByteArrayEquivalence.INSTANCE);

@Override
public boolean track(byte[] key) {
return keys.add(key);
}

@Override
public void segmentsFinished(byte[] finishedSegments) {
}

@Override
public Set<Integer> missedSegments() {
return Collections.emptySet();
}
}

0 comments on commit 4cbbc3a

Please sign in to comment.