Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-15738 Avoid individual GET by Key in distributed query aggregation #12030

Merged
merged 2 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ List<QueryResponse> broadcast(ClusteredQueryOperation operation) {

Map<Address, BitSet> split = partitioner.split();
SegmentsClusteredQueryCommand localCommand = new SegmentsClusteredQueryCommand(cache.getName(), operation, split.get(myAddress));
// invoke on own node
CompletionStage<QueryResponse> localResponse = localInvoke(localCommand);
// sends the request remotely first
List<CompletableFuture<QueryResponse>> futureRemoteResponses = split.entrySet().stream()
.filter(e -> !e.getKey().equals(myAddress)).map(e -> {
Address address = e.getKey();
Expand All @@ -69,6 +68,9 @@ List<QueryResponse> broadcast(ClusteredQueryOperation operation) {
rpcManager.getSyncRpcOptions()).toCompletableFuture();
}).map(a -> a.thenApply(r -> (QueryResponse) r.getResponseValue())).collect(Collectors.toList());

// then, invoke on own node
CompletionStage<QueryResponse> localResponse = localInvoke(localCommand);
fax4ever marked this conversation as resolved.
Show resolved Hide resolved

List<QueryResponse> results = new ArrayList<>();
try {
results.add(await(localResponse.toCompletableFuture()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
class DistributedEntryIterator<K, V> extends DistributedIterator<EntityEntry<K, V>> {

DistributedEntryIterator(LocalQueryStatistics queryStatistics, Sort sort, int fetchSize, int resultSize,
DistributedEntryIterator(LocalQueryStatistics queryStatistics, Sort sort, int resultSize,
int maxResults, int firstResult, Map<Address, NodeTopDocs> topDocsResponses,
AdvancedCache<?, ?> cache) {
super(queryStatistics, sort, fetchSize, resultSize, maxResults, firstResult, topDocsResponses, cache);
super(queryStatistics, sort, resultSize, maxResults, firstResult, topDocsResponses, cache);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public CloseableIterator<E> iterator() throws SearchException {
aggregation.displayGroupFirst(), luceneSort);
}

return new DistributedIterator<>(queryStatistics, luceneSort, maxResults, resultSize, maxResults,
return new DistributedIterator<>(queryStatistics, luceneSort, resultSize, maxResults,
firstResult, topDocsResponses, cache);
}

Expand All @@ -115,7 +115,7 @@ public <K> CloseableIterator<EntityEntry<K, E>> entryIterator() {
}

return new DistributedEntryIterator<>(queryStatistics, queryDefinition.getSearchQueryBuilder().getLuceneSort(),
maxResults, resultSize, maxResults, firstResult, topDocsResponses, cache);
resultSize, maxResults, firstResult, topDocsResponses, cache);
}

// number of results of each node of cluster
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package org.infinispan.query.clustered;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.encoding.DataConversion;
import org.infinispan.query.core.stats.impl.LocalQueryStatistics;
import org.infinispan.remoting.transport.Address;
import org.infinispan.security.actions.SecurityActions;

/**
* Iterates on the results of a distributed query returning the values. Subclasses can customize this by overriding the
* {@link #decorate} method.
* {@link #decorate(Object, Object, float)} method.
*
* @param <T> The return type of the iterator
* @author Israel Lacerra &lt;israeldl@gmail.com&gt;
Expand All @@ -27,13 +32,10 @@
*/
class DistributedIterator<T> implements CloseableIterator<T> {

private final AdvancedCache<?, ?> cache;
private final DataConversion keyDataConversion;
private final AdvancedCache<Object, Object> cache;

private int currentIndex = -1;

// todo [anistor] seems we are ignoring fetchSize https://issues.jboss.org/browse/ISPN-9506
private final int fetchSize;
private final int resultSize;
private final int maxResults;
private final int firstResult;
Expand All @@ -42,16 +44,18 @@ class DistributedIterator<T> implements CloseableIterator<T> {
private final TopDocs mergedResults;
private final LocalQueryStatistics queryStatistics;

DistributedIterator(LocalQueryStatistics queryStatistics, Sort sort, int fetchSize, int resultSize, int maxResults,
private int valueIndex;
private final int batchSize;
private final List<T> values;

DistributedIterator(LocalQueryStatistics queryStatistics, Sort sort, int resultSize, int maxResults,
int firstResult, Map<Address, NodeTopDocs> topDocsResponses, AdvancedCache<?, ?> cache) {
this.queryStatistics = queryStatistics;
this.fetchSize = fetchSize;
this.resultSize = resultSize;
this.maxResults = maxResults;
this.firstResult = firstResult;
this.cache = cache;
this.keyDataConversion = cache.getKeyDataConversion();
final int parallels = topDocsResponses.size();
this.cache = (AdvancedCache<Object, Object>) cache;
int parallels = topDocsResponses.size();
this.partialResults = new NodeTopDocs[parallels];
boolean isFieldDocs = expectTopFieldDocs(topDocsResponses);
TopDocs[] partialTopDocs = isFieldDocs ? new TopFieldDocs[parallels] : new TopDocs[parallels];
Expand All @@ -69,6 +73,8 @@ class DistributedIterator<T> implements CloseableIterator<T> {
} else {
mergedResults = TopDocs.merge(firstResult, maxResults, partialTopDocs);
}
batchSize = Math.min(maxResults, cache.getCacheConfiguration().clustering().stateTransfer().chunkSize());
values = new ArrayList<>(batchSize);
}

// Inspired by org.opensearch.action.search.SearchPhaseController
Expand All @@ -80,7 +86,7 @@ static void setShardIndex(TopDocs topDocs, int shardIndex) {
}
}

private boolean expectTopFieldDocs(Map<Address, NodeTopDocs> topDocsResponses) {
private static boolean expectTopFieldDocs(Map<Address, NodeTopDocs> topDocsResponses) {
Iterator<NodeTopDocs> it = topDocsResponses.values().iterator();
if (it.hasNext()) {
return it.next().topDocs instanceof TopFieldDocs;
Expand All @@ -99,6 +105,78 @@ public final T next() {
throw new NoSuchElementException();
}

// hasNext populate the values if returns true
assert !values.isEmpty();
assert valueIndex < values.size();

return values.get(valueIndex++);
}

/**
* Extension point for subclasses.
*/
protected T decorate(Object key, Object value, float score) {
return (T) value;
}

@Override
public final boolean hasNext() {
if (valueIndex == values.size()) {
fetchBatch();
}
return valueIndex < values.size();
}

private void fetchBatch() {
// keep the order
List<KeyAndScore> keys = new ArrayList<>(batchSize);
values.clear();
valueIndex = 0;
for (int i = 0; i < batchSize; ++i) {
if (!hasMoreKeys()) {
break;
}
KeyAndScore key = nextKey();
if (key != null) {
keys.add(key);
}
}

if (keys.isEmpty()) {
return;
}

if (!queryStatistics.isEnabled()) {
getAllAndStore(keys);
return;
}

TimeService timeService = SecurityActions.getCacheComponentRegistry(cache).getTimeService();
long start = timeService.time();
getAllAndStore(keys);
queryStatistics.entityLoaded(timeService.timeDuration(start, TimeUnit.NANOSECONDS));
fax4ever marked this conversation as resolved.
Show resolved Hide resolved
}

private void getAllAndStore(List<KeyAndScore> keysAndScores) {
var keySet = keysAndScores.stream()
.map(keyAndScore -> keyAndScore.key)
.collect(Collectors.toSet());
Map<Object, Object> map = cache.getAll(keySet);
keysAndScores.stream()
.map(keyAndScore -> decorate(keyAndScore.key, map.get(keyAndScore.key), keyAndScore.score))
.forEach(values::add);
}

private Object keyFromStorage(Object key) {
return cache.getKeyDataConversion().fromStorage(key);
}

private boolean hasMoreKeys() {
int nextIndex = currentIndex + 1;
return firstResult + nextIndex < resultSize && nextIndex < maxResults;
}

private KeyAndScore nextKey() {
currentIndex++;

ScoreDoc scoreDoc = mergedResults.scoreDocs[currentIndex];
Expand All @@ -117,31 +195,20 @@ public final T next() {

int pos = partialPositionNext[index]++;

Object[] keys = nodeTopDocs.keys;
if (keys == null || keys.length == 0) {
return (T) nodeTopDocs.projections[pos];
if (nodeTopDocs.keys == null || nodeTopDocs.keys.length == 0) {
values.add((T) nodeTopDocs.projections[pos]);
return null;
}

long start = queryStatistics.isEnabled() ? System.nanoTime() : 0;

Object key = keyDataConversion.fromStorage(keys[pos]);
T value = (T) cache.get(key);

if (queryStatistics.isEnabled()) queryStatistics.entityLoaded(System.nanoTime() - start);

return decorate(key, value, scoreDoc.score);
return new KeyAndScore(keyFromStorage(nodeTopDocs.keys[pos]), scoreDoc.score);
}

/**
* Extension point for subclasses.
*/
protected T decorate(Object key, Object value, float score) {
return (T) value;
}
static class KeyAndScore {
final Object key;
final float score;

@Override
public final boolean hasNext() {
int nextIndex = currentIndex + 1;
return firstResult + nextIndex < resultSize && nextIndex < maxResults;
KeyAndScore(Object key, float score) {
this.key = key;
this.score = score;
}
}
}