Skip to content

Commit

Permalink
ISPN-9469 Implement timeout for queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Fernandes committed May 22, 2020
1 parent 4930b1f commit f0af2df
Show file tree
Hide file tree
Showing 19 changed files with 326 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.infinispan.query.core.impl;

import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -82,19 +86,15 @@ private QueryResult<T> listInternal() {
} else {
Comparator<Comparable<FilterResult>[]> comparator = getComparator();
if (comparator == null) {
results = StreamSupport.stream(spliterator(), false).collect(Collectors.toList());
results = StreamSupport.stream(spliterator(), false).collect(new TimedCollector<>(Collectors.toList(), timeout));
} else {
log.warnPerfSortedNonIndexed(queryString);
// collect and sort results, in reverse order for now
PriorityQueue<FilterResult> filterResults = new PriorityQueue<>(INITIAL_CAPACITY, new ReverseFilterResultComparator(comparator));
while (iterator.hasNext()) {
FilterResult entry = iterator.next();
filterResults.add(entry);
if (maxResults != -1 && filterResults.size() > startOffset + maxResults) {
// remove the head, which is actually the highest result
filterResults.remove();
}
}
PriorityQueue<FilterResult> queue = new PriorityQueue<>(INITIAL_CAPACITY, new ReverseFilterResultComparator(comparator));
PriorityQueue<FilterResult> filterResults = StreamSupport
.stream(spliteratorUnknownSize(iterator, 0), false)
.collect(new TimedCollector<>(Collector.of(() -> queue, this::addToPriorityQueue, (q1, q2) -> q1, IDENTITY_FINISH), timeout));

// collect and reverse
if (filterResults.size() > startOffset) {
Object[] res = new Object[filterResults.size() - startOffset];
Expand All @@ -114,6 +114,13 @@ private QueryResult<T> listInternal() {
return new QueryResultImpl<>(OptionalLong.empty(), (List<T>) results);
}

private void addToPriorityQueue(PriorityQueue<FilterResult> queue, FilterResult filterResult) {
queue.add(filterResult);
if (maxResults != -1 && queue.size() > startOffset + maxResults) {
queue.remove();
}
}

/**
* Create a comparator to be used for ordering the results returned by {@link #getIterator()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.objectfilter.impl.syntax.parser.IckleParsingResult;
Expand Down Expand Up @@ -93,6 +94,9 @@ private Query<T> createQuery() {
// the query is created first time only
if (query == null) {
query = queryEngine.buildQuery(queryFactory, parsingResult, namedParameters, startOffset, maxResults, queryMode);
if (timeout > 0) {
query.timeout(timeout, TimeUnit.NANOSECONDS);
}
}
return query;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.infinispan.query.core.impl;

import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

import org.infinispan.commons.time.DefaultTimeService;
import org.infinispan.commons.time.TimeService;
import org.infinispan.query.dsl.SearchTimeoutException;

/**
* @since 11.0
*/
class TimedCollector<U, A, R> implements Collector<U, A, R> {
private final Collector<U, A, R> collector;
private final long timeout;

private static final TimeService TIME_SERVICE = DefaultTimeService.INSTANCE;

public TimedCollector(Collector<U, A, R> collector, long timeout) {
this.collector = collector;
this.timeout = timeout;
}

@Override
public Supplier<A> supplier() {
return collector.supplier();
}

@Override
public BiConsumer<A, U> accumulator() {
BiConsumer<A, U> accumulator = collector.accumulator();

if (timeout < 0) return accumulator;

return new BiConsumer<A, U>() {
int index = 0;
long limit = TIME_SERVICE.time() + timeout;

boolean divBy32(int i) {
return (i & ((1 << 5) - 1)) == 0;
}

@Override
public void accept(A a, U u) {
if (divBy32(index++) && TIME_SERVICE.isTimeExpired(limit)) {
throw new SearchTimeoutException();
}
accumulator.accept(a, u);
}
};
}

@Override
public BinaryOperator<A> combiner() {
return collector.combiner();
}

@Override
public Function<A, R> finisher() {
return collector.finisher();
}

@Override
public Set<Characteristics> characteristics() {
return collector.characteristics();
}
}
7 changes: 7 additions & 0 deletions query-dsl/src/main/java/org/infinispan/query/dsl/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.util.CloseableIterator;

Expand Down Expand Up @@ -91,4 +92,10 @@ public interface Query<T> extends Iterable<T>, PaginationContext<Query<T>>, Para
* @return the results of the query as an iterator.
*/
CloseableIterator<T> iterator();

/**
* Set the timeout for this query. If the query hasn't finished processing before the timeout,
* a {@link SearchTimeoutException} will be thrown.
*/
Query<T> timeout(long timeout, TimeUnit timeUnit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.infinispan.query.dsl;

/**
* Thrown when a query timeout occurs.
*
* @since 11.0
*/
public class SearchTimeoutException extends RuntimeException {

public SearchTimeoutException() {
}

public SearchTimeoutException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
Expand Down Expand Up @@ -33,6 +34,8 @@ public abstract class BaseQuery<T> implements Query<T> {

protected int maxResults;

protected long timeout = -1;

//todo [anistor] can startOffset really be a long or it really has to be int due to limitations in query module?
protected BaseQuery(QueryFactory queryFactory, String queryString,
Map<String, Object> namedParameters, String[] projection, long startOffset, int maxResults) {
Expand Down Expand Up @@ -175,4 +178,10 @@ public Query<T> maxResults(int maxResults) {
resetQuery();
return this;
}

@Override
public Query<T> timeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeUnit.toNanos(timeout);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.query.dsl.Query;
Expand Down Expand Up @@ -51,6 +52,11 @@ public T next() {
};
}

@Override
public Query<T> timeout(long timeout, TimeUnit timeUnit) {
return this;
}

@Override
public String getQueryString() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public List<E> list() throws SearchException {

@Override
public IndexedQuery<E> timeout(long timeout, TimeUnit timeUnit) {
// TODO [anistor] see https://issues.jboss.org/browse/ISPN-9469
throw new UnsupportedOperationException("Clustered queries do not support timeouts yet.");
queryDefinition.setTimeout(timeout, timeUnit);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import org.hibernate.search.exception.SearchException;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.util.Util;
import org.infinispan.query.dsl.SearchTimeoutException;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
Expand Down Expand Up @@ -105,6 +107,10 @@ List<QueryResponse> broadcast(ClusteredQueryOperation operation) {
} catch (InterruptedException e) {
throw new SearchException("Interrupted while searching locally", e);
} catch (ExecutionException e) {
Throwable rootCause = Util.getRootCause(e);
if (rootCause instanceof SearchTimeoutException) {
throw (SearchTimeoutException) rootCause;
}
throw new SearchException("Exception while searching locally", e);
}
return results;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -69,6 +70,8 @@ private IndexedQuery<T> createCacheQuery() {
if (cacheQuery == null) {
validateNamedParameters();
cacheQuery = queryEngine.buildLuceneQuery(parsingResult, namedParameters, startOffset, maxResults, queryMode);
if (timeout > 0)
cacheQuery.timeout(timeout, TimeUnit.NANOSECONDS);
}
return cacheQuery;
}
Expand Down
13 changes: 13 additions & 0 deletions query/src/main/java/org/infinispan/query/impl/QueryDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Sort;
Expand All @@ -20,6 +21,7 @@
import org.hibernate.search.spi.impl.PojoIndexedTypeIdentifier;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.query.dsl.SearchTimeoutException;
import org.infinispan.query.dsl.embedded.impl.HsQueryRequest;
import org.infinispan.query.dsl.embedded.impl.QueryEngine;
import org.infinispan.query.impl.externalizers.ExternalizerIds;
Expand All @@ -38,6 +40,7 @@ public final class QueryDefinition {
private HSQuery hsQuery;
private int maxResults = 100;
private int firstResult;
private long timeout = -1;
private Set<String> sortableFields;
private Class<?> indexedType;
private Sort sort;
Expand Down Expand Up @@ -98,6 +101,10 @@ public void initialize(AdvancedCache<?, ?> cache) {
hsQuery.projection(hsQueryRequest.getProjections());
hsQuery.firstResult(firstResult);
hsQuery.maxResults(maxResults);
hsQuery.timeoutExceptionFactory((msg, q) -> new SearchTimeoutException(msg + " \"" + q + '\"'));
if (timeout > 0) {
hsQuery.getTimeoutManager().setTimeout(timeout, TimeUnit.NANOSECONDS);
}
}
}

Expand Down Expand Up @@ -127,6 +134,10 @@ public void setNamedParameters(Map<String, Object> params) {
}
}

public void setTimeout(long timeout, TimeUnit timeUnit) {
this.timeout = timeUnit.toNanos(timeout);
}

public Map<String, Object> getNamedParameters() {
return namedParameters;
}
Expand Down Expand Up @@ -205,6 +216,7 @@ public void writeObject(ObjectOutput output, QueryDefinition queryDefinition) th
output.writeInt(queryDefinition.maxResults);
output.writeObject(queryDefinition.sortableFields);
output.writeObject(queryDefinition.indexedType);
output.writeLong(queryDefinition.timeout);
Map<String, Object> namedParameters = queryDefinition.namedParameters;
int paramSize = namedParameters.size();
output.writeShort(paramSize);
Expand All @@ -227,6 +239,7 @@ public QueryDefinition readObject(ObjectInput input) throws IOException, ClassNo
Set<String> sortableField = (Set<String>) input.readObject();
Class<?> indexedTypes = (Class<?>) input.readObject();
queryDefinition.setSortableField(sortableField);
queryDefinition.timeout = input.readLong();
queryDefinition.setIndexedType(indexedTypes);
short paramSize = input.readShort();
if (paramSize != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import org.infinispan.AdvancedCache;
import org.infinispan.query.CacheQuery;
import org.infinispan.query.MassIndexer;
import org.infinispan.query.dsl.SearchTimeoutException;
import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.dsl.IndexedQueryMode;
import org.infinispan.query.dsl.embedded.impl.QueryEngine;
import org.infinispan.query.spi.SearchManagerImplementor;
import org.infinispan.util.concurrent.TimeoutException;

/**
* Class that is used to build a {@link org.infinispan.query.CacheQuery} based on a Lucene or an Ickle query, only for
Expand Down Expand Up @@ -55,7 +55,7 @@ public SearchManagerImpl(AdvancedCache<?, ?> cache, QueryEngine<?> queryEngine)
this.keyTransformationHandler = ComponentRegistryUtils.getKeyTransformationHandler(cache);
this.queryEngine = queryEngine;
this.massIndexer = (MassIndexer) ComponentRegistryUtils.getIndexer(cache);
this.timeoutExceptionFactory = (msg, q) -> new TimeoutException(msg + " \"" + q + '\"');
this.timeoutExceptionFactory = (msg, q) -> new SearchTimeoutException(msg + " \"" + q + '\"');
}

@Override
Expand Down

0 comments on commit f0af2df

Please sign in to comment.