Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-9469 Implement timeout for queries #8382

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.infinispan.query.core.impl;

import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -82,19 +86,15 @@ private QueryResult<T> listInternal() {
} else {
Comparator<Comparable<FilterResult>[]> comparator = getComparator();
if (comparator == null) {
results = StreamSupport.stream(spliterator(), false).collect(Collectors.toList());
results = StreamSupport.stream(spliterator(), false).collect(new TimedCollector<>(Collectors.toList(), timeout));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the timeout is detected when collecting. But what if the splitterator blocks indefinitely when trying to fetch next?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will probably timeout after the RpcTimeout

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also set the timeout in the stream itself, but only distributed streams support timeout

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

} else {
log.warnPerfSortedNonIndexed(queryString);
// collect and sort results, in reverse order for now
PriorityQueue<FilterResult> filterResults = new PriorityQueue<>(INITIAL_CAPACITY, new ReverseFilterResultComparator(comparator));
while (iterator.hasNext()) {
FilterResult entry = iterator.next();
filterResults.add(entry);
if (maxResults != -1 && filterResults.size() > startOffset + maxResults) {
// remove the head, which is actually the highest result
filterResults.remove();
}
}
PriorityQueue<FilterResult> queue = new PriorityQueue<>(INITIAL_CAPACITY, new ReverseFilterResultComparator(comparator));
PriorityQueue<FilterResult> filterResults = StreamSupport
.stream(spliteratorUnknownSize(iterator, 0), false)
.collect(new TimedCollector<>(Collector.of(() -> queue, this::addToPriorityQueue, (q1, q2) -> q1, IDENTITY_FINISH), timeout));

// collect and reverse
if (filterResults.size() > startOffset) {
Object[] res = new Object[filterResults.size() - startOffset];
Expand All @@ -114,6 +114,13 @@ private QueryResult<T> listInternal() {
return new QueryResultImpl<>(OptionalLong.empty(), (List<T>) results);
}

private void addToPriorityQueue(PriorityQueue<FilterResult> queue, FilterResult filterResult) {
queue.add(filterResult);
if (maxResults != -1 && queue.size() > startOffset + maxResults) {
queue.remove();
}
}

/**
* Create a comparator to be used for ordering the results returned by {@link #getIterator()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.objectfilter.impl.syntax.parser.IckleParsingResult;
Expand Down Expand Up @@ -93,6 +94,9 @@ private Query<T> createQuery() {
// the query is created first time only
if (query == null) {
query = queryEngine.buildQuery(queryFactory, parsingResult, namedParameters, startOffset, maxResults, queryMode);
if (timeout > 0) {
query.timeout(timeout, TimeUnit.NANOSECONDS);
}
}
return query;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.infinispan.AdvancedCache;
import org.infinispan.CacheStream;
Expand Down Expand Up @@ -65,6 +66,9 @@ protected CloseableIterator<ObjectFilter.FilterResult> getIterator() {
IckleFilterAndConverter<Object, Object> ickleFilter = (IckleFilterAndConverter<Object, Object>) createFilter();
CacheStream<CacheEntry<Object, Object>> entryStream = ((AdvancedCache<Object, Object>) cache).cacheEntrySet().stream();
CacheStream<ObjectFilter.FilterResult> resultStream = CacheFilters.filterAndConvertToValue(entryStream, ickleFilter);
if(timeout > 0) {
resultStream = resultStream.timeout(timeout, TimeUnit.NANOSECONDS);
}
return Closeables.iterator(resultStream);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.infinispan.query.core.impl;

import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

import org.infinispan.commons.time.DefaultTimeService;
import org.infinispan.commons.time.TimeService;
import org.infinispan.query.SearchTimeoutException;

/**
* @since 11.0
*/
class TimedCollector<U, A, R> implements Collector<U, A, R> {
private final Collector<U, A, R> collector;
private final long timeout;

private static final TimeService TIME_SERVICE = DefaultTimeService.INSTANCE;

public TimedCollector(Collector<U, A, R> collector, long timeout) {
this.collector = collector;
this.timeout = timeout;
}

@Override
public Supplier<A> supplier() {
return collector.supplier();
}

@Override
public BiConsumer<A, U> accumulator() {
BiConsumer<A, U> accumulator = collector.accumulator();

if (timeout < 0) return accumulator;

return new BiConsumer<A, U>() {
int index = 0;
long limit = TIME_SERVICE.time() + timeout;

boolean divBy32(int i) {
return (i & ((1 << 5) - 1)) == 0;
}

@Override
public void accept(A a, U u) {
if (divBy32(index++) && TIME_SERVICE.isTimeExpired(limit)) {
throw new SearchTimeoutException();
}
accumulator.accept(a, u);
}
};
}

@Override
public BinaryOperator<A> combiner() {
return collector.combiner();
}

@Override
public Function<A, R> finisher() {
return collector.finisher();
}

@Override
public Set<Characteristics> characteristics() {
return collector.characteristics();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.infinispan.query;

/**
* Thrown when a query timeout occurs.
*
* @since 11.0
*/
public class SearchTimeoutException extends RuntimeException {

public SearchTimeoutException() {
}

public SearchTimeoutException(String msg) {
super(msg);
}
}
8 changes: 8 additions & 0 deletions query-dsl/src/main/java/org/infinispan/query/dsl/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.query.SearchTimeoutException;

//todo [anistor] We need to deprecate the 'always caching' behaviour and provide a clearCachedResults method

Expand Down Expand Up @@ -91,4 +93,10 @@ public interface Query<T> extends Iterable<T>, PaginationContext<Query<T>>, Para
* @return the results of the query as an iterator.
*/
CloseableIterator<T> iterator();

/**
* Set the timeout for this query. If the query hasn't finished processing before the timeout,
* a {@link SearchTimeoutException} will be thrown.
*/
Query<T> timeout(long timeout, TimeUnit timeUnit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
Expand Down Expand Up @@ -33,6 +34,8 @@ public abstract class BaseQuery<T> implements Query<T> {

protected int maxResults;

protected long timeout = -1;

//todo [anistor] can startOffset really be a long or it really has to be int due to limitations in query module?
protected BaseQuery(QueryFactory queryFactory, String queryString,
Map<String, Object> namedParameters, String[] projection, long startOffset, int maxResults) {
Expand Down Expand Up @@ -175,4 +178,10 @@ public Query<T> maxResults(int maxResults) {
resetQuery();
return this;
}

@Override
public Query<T> timeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeUnit.toNanos(timeout);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.query.dsl.Query;
Expand Down Expand Up @@ -51,6 +52,11 @@ public T next() {
};
}

@Override
public Query<T> timeout(long timeout, TimeUnit timeUnit) {
return this;
}

@Override
public String getQueryString() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public List<E> list() throws SearchException {

@Override
public IndexedQuery<E> timeout(long timeout, TimeUnit timeUnit) {
// TODO [anistor] see https://issues.jboss.org/browse/ISPN-9469
throw new UnsupportedOperationException("Clustered queries do not support timeouts yet.");
queryDefinition.setTimeout(timeout, timeUnit);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import org.hibernate.search.exception.SearchException;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.util.Util;
import org.infinispan.query.SearchTimeoutException;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
Expand Down Expand Up @@ -105,6 +107,10 @@ List<QueryResponse> broadcast(ClusteredQueryOperation operation) {
} catch (InterruptedException e) {
throw new SearchException("Interrupted while searching locally", e);
} catch (ExecutionException e) {
Throwable rootCause = Util.getRootCause(e);
if (rootCause instanceof SearchTimeoutException) {
throw (SearchTimeoutException) rootCause;
}
throw new SearchException("Exception while searching locally", e);
}
return results;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -69,6 +70,8 @@ private IndexedQuery<T> createCacheQuery() {
if (cacheQuery == null) {
validateNamedParameters();
cacheQuery = queryEngine.buildLuceneQuery(parsingResult, namedParameters, startOffset, maxResults, queryMode);
if (timeout > 0)
cacheQuery.timeout(timeout, TimeUnit.NANOSECONDS);
}
return cacheQuery;
}
Expand Down
13 changes: 13 additions & 0 deletions query/src/main/java/org/infinispan/query/impl/QueryDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Sort;
Expand All @@ -20,6 +21,7 @@
import org.hibernate.search.spi.impl.PojoIndexedTypeIdentifier;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.query.SearchTimeoutException;
import org.infinispan.query.dsl.embedded.impl.HsQueryRequest;
import org.infinispan.query.dsl.embedded.impl.QueryEngine;
import org.infinispan.query.impl.externalizers.ExternalizerIds;
Expand All @@ -38,6 +40,7 @@ public final class QueryDefinition {
private HSQuery hsQuery;
private int maxResults = 100;
private int firstResult;
private long timeout = -1;
private Set<String> sortableFields;
private Class<?> indexedType;
private Sort sort;
Expand Down Expand Up @@ -98,6 +101,10 @@ public void initialize(AdvancedCache<?, ?> cache) {
hsQuery.projection(hsQueryRequest.getProjections());
hsQuery.firstResult(firstResult);
hsQuery.maxResults(maxResults);
hsQuery.timeoutExceptionFactory((msg, q) -> new SearchTimeoutException(msg + " \"" + q + '\"'));
if (timeout > 0) {
hsQuery.getTimeoutManager().setTimeout(timeout, TimeUnit.NANOSECONDS);
}
}
}

Expand Down Expand Up @@ -127,6 +134,10 @@ public void setNamedParameters(Map<String, Object> params) {
}
}

public void setTimeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeUnit.toNanos(timeout);
}

public Map<String, Object> getNamedParameters() {
return namedParameters;
}
Expand Down Expand Up @@ -205,6 +216,7 @@ public void writeObject(ObjectOutput output, QueryDefinition queryDefinition) th
output.writeInt(queryDefinition.maxResults);
output.writeObject(queryDefinition.sortableFields);
output.writeObject(queryDefinition.indexedType);
output.writeLong(queryDefinition.timeout);
Map<String, Object> namedParameters = queryDefinition.namedParameters;
int paramSize = namedParameters.size();
output.writeShort(paramSize);
Expand All @@ -227,6 +239,7 @@ public QueryDefinition readObject(ObjectInput input) throws IOException, ClassNo
Set<String> sortableField = (Set<String>) input.readObject();
Class<?> indexedTypes = (Class<?>) input.readObject();
queryDefinition.setSortableField(sortableField);
queryDefinition.timeout = input.readLong();
queryDefinition.setIndexedType(indexedTypes);
short paramSize = input.readShort();
if (paramSize != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import org.infinispan.AdvancedCache;
import org.infinispan.query.CacheQuery;
import org.infinispan.query.MassIndexer;
import org.infinispan.query.SearchTimeoutException;
import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.dsl.IndexedQueryMode;
import org.infinispan.query.dsl.embedded.impl.QueryEngine;
import org.infinispan.query.spi.SearchManagerImplementor;
import org.infinispan.util.concurrent.TimeoutException;

/**
* Class that is used to build a {@link org.infinispan.query.CacheQuery} based on a Lucene or an Ickle query, only for
Expand Down Expand Up @@ -55,7 +55,7 @@ public SearchManagerImpl(AdvancedCache<?, ?> cache, QueryEngine<?> queryEngine)
this.keyTransformationHandler = ComponentRegistryUtils.getKeyTransformationHandler(cache);
this.queryEngine = queryEngine;
this.massIndexer = (MassIndexer) ComponentRegistryUtils.getIndexer(cache);
this.timeoutExceptionFactory = (msg, q) -> new TimeoutException(msg + " \"" + q + '\"');
this.timeoutExceptionFactory = (msg, q) -> new SearchTimeoutException(msg + " \"" + q + '\"');
}

@Override
Expand Down