Skip to content

Commit

Permalink
ISPN-15738 Avoid individual GET by Key in distributed query aggregation
Browse files Browse the repository at this point in the history
Use getAll to fetch multiple keys in a single RPC request.
The batch is the same as the chunk-size for state transfer.
  • Loading branch information
pruivo committed Feb 21, 2024
1 parent dd3dab8 commit c63bf2f
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ List<QueryResponse> broadcast(ClusteredQueryOperation operation) {

Map<Address, BitSet> split = partitioner.split();
SegmentsClusteredQueryCommand localCommand = new SegmentsClusteredQueryCommand(cache.getName(), operation, split.get(myAddress));
// invoke on own node
CompletionStage<QueryResponse> localResponse = localInvoke(localCommand);
// sends the request remotely first
List<CompletableFuture<QueryResponse>> futureRemoteResponses = split.entrySet().stream()
.filter(e -> !e.getKey().equals(myAddress)).map(e -> {
Address address = e.getKey();
Expand All @@ -69,6 +68,9 @@ List<QueryResponse> broadcast(ClusteredQueryOperation operation) {
rpcManager.getSyncRpcOptions()).toCompletableFuture();
}).map(a -> a.thenApply(r -> (QueryResponse) r.getResponseValue())).collect(Collectors.toList());

// then, invoke on own node
CompletionStage<QueryResponse> localResponse = localInvoke(localCommand);

List<QueryResponse> results = new ArrayList<>();
try {
results.add(await(localResponse.toCompletableFuture()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,14 @@
*/
class DistributedEntryIterator<K, V> extends DistributedIterator<Map.Entry<K, V>> {

DistributedEntryIterator(LocalQueryStatistics queryStatistics, Sort sort, int fetchSize, int resultSize,
DistributedEntryIterator(LocalQueryStatistics queryStatistics, Sort sort, int resultSize,
int maxResults, int firstResult, Map<Address, NodeTopDocs> topDocsResponses,
AdvancedCache<?, ?> cache) {
super(queryStatistics, sort, fetchSize, resultSize, maxResults, firstResult, topDocsResponses, cache);
super(queryStatistics, sort, resultSize, maxResults, firstResult, topDocsResponses, cache);
}

@Override
protected Map.Entry<K, V> decorate(Object key, Object value) {
return new Map.Entry<K, V>() {

@Override
public K getKey() {
return (K) key;
}

@Override
public V getValue() {
return (V) value;
}

@Override
public V setValue(V value) {
throw new UnsupportedOperationException("Entry is immutable");
}
};
return Map.entry((K) key, (V) value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.StreamSupport.stream;

import static org.infinispan.query.logging.Log.CONTAINER;

import java.util.Collections;
Expand Down Expand Up @@ -96,7 +95,7 @@ public CloseableIterator<E> iterator() throws SearchException {
aggregation.displayGroupFirst(), luceneSort);
}

return new DistributedIterator<>(queryStatistics, luceneSort, maxResults, resultSize, maxResults,
return new DistributedIterator<>(queryStatistics, luceneSort, resultSize, maxResults,
firstResult, topDocsResponses, cache);
}

Expand All @@ -114,7 +113,7 @@ public <K> CloseableIterator<Map.Entry<K, E>> entryIterator() {
}

return new DistributedEntryIterator<>(queryStatistics, queryDefinition.getSearchQueryBuilder().getLuceneSort(),
maxResults, resultSize, maxResults, firstResult, topDocsResponses, cache);
resultSize, maxResults, firstResult, topDocsResponses, cache);
}

// number of results of each node of cluster
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package org.infinispan.query.clustered;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.encoding.DataConversion;
import org.infinispan.query.core.stats.impl.LocalQueryStatistics;
import org.infinispan.remoting.transport.Address;

/**
* Iterates on the results of a distributed query returning the values. Subclasses can customize this by overriding the
* {@link #decorate} method.
* {@link #decorate(Object, Object)} method.
*
* @param <T> The return type of the iterator
* @author Israel Lacerra &lt;israeldl@gmail.com&gt;
Expand All @@ -27,13 +32,10 @@
*/
class DistributedIterator<T> implements CloseableIterator<T> {

private final AdvancedCache<?, ?> cache;
private final DataConversion keyDataConversion;
private final AdvancedCache<Object, Object> cache;

private int currentIndex = -1;

// todo [anistor] seems we are ignoring fetchSize https://issues.jboss.org/browse/ISPN-9506
private final int fetchSize;
private final int resultSize;
private final int maxResults;
private final int firstResult;
Expand All @@ -42,16 +44,18 @@ class DistributedIterator<T> implements CloseableIterator<T> {
private final TopDocs mergedResults;
private final LocalQueryStatistics queryStatistics;

DistributedIterator(LocalQueryStatistics queryStatistics, Sort sort, int fetchSize, int resultSize, int maxResults,
private int valueIndex;
private final int batchSize;
private final List<T> values;

DistributedIterator(LocalQueryStatistics queryStatistics, Sort sort, int resultSize, int maxResults,
int firstResult, Map<Address, NodeTopDocs> topDocsResponses, AdvancedCache<?, ?> cache) {
this.queryStatistics = queryStatistics;
this.fetchSize = fetchSize;
this.resultSize = resultSize;
this.maxResults = maxResults;
this.firstResult = firstResult;
this.cache = cache;
this.keyDataConversion = cache.getKeyDataConversion();
final int parallels = topDocsResponses.size();
this.cache = (AdvancedCache<Object, Object>) cache;
int parallels = topDocsResponses.size();
this.partialResults = new NodeTopDocs[parallels];
boolean isFieldDocs = expectTopFieldDocs(topDocsResponses);
TopDocs[] partialTopDocs = isFieldDocs ? new TopFieldDocs[parallels] : new TopDocs[parallels];
Expand All @@ -67,9 +71,11 @@ class DistributedIterator<T> implements CloseableIterator<T> {
} else {
mergedResults = TopDocs.merge(firstResult, maxResults, partialTopDocs, true);
}
batchSize = cache.getCacheConfiguration().clustering().stateTransfer().chunkSize();
values = new ArrayList<>(batchSize);
}

private boolean expectTopFieldDocs(Map<Address, NodeTopDocs> topDocsResponses) {
private static boolean expectTopFieldDocs(Map<Address, NodeTopDocs> topDocsResponses) {
Iterator<NodeTopDocs> it = topDocsResponses.values().iterator();
if (it.hasNext()) {
return it.next().topDocs instanceof TopFieldDocs;
Expand All @@ -88,6 +94,69 @@ public final T next() {
throw new NoSuchElementException();
}

// hasNext populate the values if returns true
assert !values.isEmpty();
assert valueIndex < values.size();

return values.get(valueIndex++);
}

/**
* Extension point for subclasses.
*/
protected T decorate(Object key, Object value) {
return (T) value;
}

@Override
public final boolean hasNext() {
if (valueIndex == values.size()) {
fetchBatch();
}
return valueIndex < values.size();
}

private void fetchBatch() {
// keep the order
Set<Object> keys = new LinkedHashSet<>(batchSize);
values.clear();
valueIndex = 0;
for (int i = 0; i < batchSize; ++i) {
if (!hasMoreKeys()) {
break;
}
Object key = nextKey();
if (key != null) {
keys.add(cache.getKeyDataConversion().fromStorage(key));
}
}

if (keys.isEmpty()) {
return;
}

if (!queryStatistics.isEnabled()) {
getAllAndStore(keys);
return;
}

TimeService timeService = cache.getComponentRegistry().getTimeService();
long start = timeService.time();
getAllAndStore(keys);
queryStatistics.entityLoaded(timeService.timeDuration(start, TimeUnit.NANOSECONDS));
}

private void getAllAndStore(Set<Object> keys) {
Map<Object, Object> map = cache.getAll(keys);
keys.stream().map(key -> decorate(key, map.get(key))).forEach(values::add);
}

private boolean hasMoreKeys() {
int nextIndex = currentIndex + 1;
return firstResult + nextIndex < resultSize && nextIndex < maxResults;
}

private Object nextKey() {
currentIndex++;

ScoreDoc scoreDoc = mergedResults.scoreDocs[currentIndex];
Expand All @@ -106,31 +175,10 @@ public final T next() {

int pos = partialPositionNext[index]++;

Object[] keys = nodeTopDocs.keys;
if (keys == null || keys.length == 0) {
return (T) nodeTopDocs.projections[pos];
if (nodeTopDocs.keys == null || nodeTopDocs.keys.length == 0) {
values.add((T) nodeTopDocs.projections[pos]);
return null;
}

long start = queryStatistics.isEnabled() ? System.nanoTime() : 0;

Object key = keyDataConversion.fromStorage(keys[pos]);
T value = (T) cache.get(key);

if (queryStatistics.isEnabled()) queryStatistics.entityLoaded(System.nanoTime() - start);

return decorate(key, value);
}

/**
* Extension point for subclasses.
*/
protected T decorate(Object key, Object value) {
return (T) value;
}

@Override
public final boolean hasNext() {
int nextIndex = currentIndex + 1;
return firstResult + nextIndex < resultSize && nextIndex < maxResults;
return nodeTopDocs.keys[pos];
}
}

0 comments on commit c63bf2f

Please sign in to comment.