Skip to content

Commit

Permalink
ISPN-6395 Deprecate SearchManager.getClusteredQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Fernandes authored and anistor committed Dec 13, 2017
1 parent 378dd5b commit 9d82d98
Show file tree
Hide file tree
Showing 29 changed files with 670 additions and 167 deletions.
@@ -0,0 +1,20 @@
package org.infinispan.query.dsl;

/**
* Defines the execution mode of an indexed query.
*
* @since 9.2
*/
public enum IndexedQueryMode {
/**
* Query is sent to all nodes, and results are combined before returning to the caller. This allows each node to have
* its own index, and the query will return the cluster wide results.
*/
BROADCAST,

/**
* Query is executed locally in the caller. The whole index must be available in order to return full
* results, otherwise only results available at the caller's local index are returned.
*/
FETCH
}
112 changes: 112 additions & 0 deletions query/src/main/java/org/infinispan/query/QueryDefinition.java
@@ -0,0 +1,112 @@
package org.infinispan.query;

import java.util.Optional;

import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Sort;
import org.hibernate.search.filter.FullTextFilter;
import org.hibernate.search.query.engine.spi.HSQuery;
import org.infinispan.AdvancedCache;
import org.infinispan.query.dsl.embedded.impl.EmbeddedQueryEngine;
import org.infinispan.query.dsl.embedded.impl.HsQueryRequest;
import org.infinispan.query.dsl.embedded.impl.QueryEngine;
import org.infinispan.query.logging.Log;
import org.infinispan.util.logging.LogFactory;

/**
* Stores the query to be executed in a cache in either a String or {@link HSQuery} form together with pagination
* and sort information.
*
* @since 9.2
*/
public class QueryDefinition {

private static final Log log = LogFactory.getLog(QueryDefinition.class, Log.class);

private String queryString;
private HSQuery hsQuery;
private int maxResults = 100;
private int firstResult;

private transient Sort sort;

public QueryDefinition(String queryString) {
this.queryString = queryString;
}

public QueryDefinition(HSQuery hsQuery) {
this.hsQuery = hsQuery;
}

public Optional<String> getQueryString() {
return Optional.ofNullable(queryString);
}

public void initialize(AdvancedCache<?, ?> cache) {
if (hsQuery == null) {
QueryEngine queryEngine = cache.getComponentRegistry().getComponent(EmbeddedQueryEngine.class);
HsQueryRequest hsQueryRequest = queryEngine.createHsQuery(queryString);
this.hsQuery = hsQueryRequest.getHsQuery();
this.sort = hsQueryRequest.getSort();
hsQuery.firstResult(firstResult);
hsQuery.maxResults(maxResults);
}
}

public HSQuery getHsQuery() {
if (hsQuery == null) {
throw new IllegalStateException("The QueryDefinition has not been initialized, make sure to call initialize(...) first");
}
return hsQuery;
}

public int getMaxResults() {
return maxResults;
}

public void setMaxResults(int maxResults) {
this.maxResults = maxResults;
if (hsQuery != null) {
hsQuery.maxResults(maxResults);
}
}

public int getFirstResult() {
return firstResult;
}

public void setFirstResult(int firstResult) {
if (hsQuery != null) {
hsQuery.firstResult(firstResult);
}
this.firstResult = firstResult;
}

public Sort getSort() {
return sort;
}

public void setSort(Sort sort) {
if (queryString != null) {
throw log.sortNotSupportedWithQueryString();
}
hsQuery.sort(sort);
this.sort = sort;
}


public void filter(Filter filter) {
if (queryString != null) throw log.filterNotSupportedWithQueryString();
hsQuery.filter(filter);
}

public FullTextFilter enableFullTextFilter(String name) {
if (queryString != null) throw log.filterNotSupportedWithQueryString();
return hsQuery.enableFullTextFilter(name);
}

public void disableFullTextFilter(String name) {
if (queryString != null) throw log.filterNotSupportedWithQueryString();
hsQuery.disableFullTextFilter(name);
}
}
36 changes: 25 additions & 11 deletions query/src/main/java/org/infinispan/query/SearchManager.java
Expand Up @@ -4,6 +4,7 @@
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.hibernate.search.query.dsl.EntityContext; import org.hibernate.search.query.dsl.EntityContext;
import org.hibernate.search.stat.Statistics; import org.hibernate.search.stat.Statistics;
import org.infinispan.query.dsl.IndexedQueryMode;


/** /**
* The SearchManager is the entry point to create full text queries on top of a cache. * The SearchManager is the entry point to create full text queries on top of a cache.
Expand All @@ -18,27 +19,40 @@ public interface SearchManager {
* in. If no classes are passed in, it is assumed that no type filtering is performed and so all known types will * in. If no classes are passed in, it is assumed that no type filtering is performed and so all known types will
* be searched. * be searched.
* *
* @param luceneQuery - {@link org.apache.lucene.search.Query} * @param luceneQuery {@link org.apache.lucene.search.Query}
* @param classes - optionally only return results of type that matches this list of acceptable types * @param indexedQueryMode The {@link IndexedQueryMode} used when executing the query.
* @return the CacheQuery object which can be used to iterate through results * @param classes Optionally only return results of type that matches this list of acceptable types.
* @return the CacheQuery object which can be used to iterate through results.
*/ */
<E> CacheQuery<E> getQuery(Query luceneQuery, Class<?>... classes); <E> CacheQuery<E> getQuery(Query luceneQuery, IndexedQueryMode indexedQueryMode, Class<?>... classes);


/** /**
* Experimental. * Builds a {@link CacheQuery} from a query string.
* Provides Hibernate Search DSL to build full text queries *
* @return * @throws org.hibernate.search.exception.SearchException if the queryString cannot be converted to an indexed query,
* due to lack of indexes to resolve it fully or if contains
* aggregations and grouping.
* @see #getQuery(Query, IndexedQueryMode, Class[])
*/ */
EntityContext buildQueryBuilderForClass(Class<?> entityType); <E> CacheQuery<E> getQuery(String queryString, IndexedQueryMode indexedQueryMode, Class<?>... classes);


/** /**
* Experimental! * @see #getQuery(Query, IndexedQueryMode, Class[])
* Use it to try out the newly introduced distributed queries. */
* <E> CacheQuery<E> getQuery(Query luceneQuery, Class<?>... classes);

/***
* @return {@link EntityContext}
*/
EntityContext buildQueryBuilderForClass(Class<?> entityType);

/***
* @param luceneQuery * @param luceneQuery
* @param classes * @param classes
* @return * @return
* @deprecated since 9.2, use {@link #getQuery(Query, IndexedQueryMode, Class[])} with QueryMode.BROADCAST
*/ */
@Deprecated
<E> CacheQuery<E> getClusteredQuery(Query luceneQuery, Class<?>... classes); <E> CacheQuery<E> getClusteredQuery(Query luceneQuery, Class<?>... classes);


/** /**
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.infinispan.AdvancedCache; import org.infinispan.AdvancedCache;
import org.infinispan.query.CacheQuery; import org.infinispan.query.CacheQuery;
import org.infinispan.query.FetchOptions; import org.infinispan.query.FetchOptions;
import org.infinispan.query.QueryDefinition;
import org.infinispan.query.ResultIterator; import org.infinispan.query.ResultIterator;
import org.infinispan.query.backend.KeyTransformationHandler; import org.infinispan.query.backend.KeyTransformationHandler;
import org.infinispan.query.impl.CacheQueryImpl; import org.infinispan.query.impl.CacheQueryImpl;
Expand All @@ -26,8 +27,6 @@
*/ */
public class ClusteredCacheQueryImpl<E> extends CacheQueryImpl<E> { public class ClusteredCacheQueryImpl<E> extends CacheQueryImpl<E> {


private Sort sort;

private Integer resultSize; private Integer resultSize;


private final ExecutorService asyncExecutor; private final ExecutorService asyncExecutor;
Expand All @@ -39,35 +38,43 @@ public class ClusteredCacheQueryImpl<E> extends CacheQueryImpl<E> {
private int firstResult = 0; private int firstResult = 0;


public ClusteredCacheQueryImpl(Query luceneQuery, SearchIntegrator searchFactory, public ClusteredCacheQueryImpl(Query luceneQuery, SearchIntegrator searchFactory,
ExecutorService asyncExecutor, AdvancedCache<?, ?> cache, KeyTransformationHandler keyTransformationHandler, Class<?>... classes) { ExecutorService asyncExecutor, AdvancedCache<?, ?> cache, KeyTransformationHandler keyTransformationHandler, Class<?>... classes) {
super(luceneQuery, searchFactory, cache, keyTransformationHandler, null, classes); super(luceneQuery, searchFactory, cache, keyTransformationHandler, null, classes);
this.asyncExecutor = asyncExecutor; this.asyncExecutor = asyncExecutor;
this.hSearchQuery = searchFactory.createHSQuery(luceneQuery, classes); }

public ClusteredCacheQueryImpl(String queryString, ExecutorService asyncExecutor, AdvancedCache<?, ?> cache,
KeyTransformationHandler keyTransformationHandler) {
super(queryString, cache, keyTransformationHandler);
this.asyncExecutor = asyncExecutor;
this.queryDefinition = new QueryDefinition(queryString);
} }


@Override @Override
public CacheQuery<E> maxResults(int maxResults) { public CacheQuery<E> maxResults(int maxResults) {
this.maxResults = maxResults; this.maxResults = maxResults;
this.queryDefinition.setMaxResults(maxResults);
return super.maxResults(maxResults); return super.maxResults(maxResults);
} }


@Override @Override
public CacheQuery<E> firstResult(int firstResult) { public CacheQuery<E> firstResult(int firstResult) {
this.firstResult = firstResult; this.firstResult = firstResult;
this.queryDefinition.setFirstResult(firstResult);
return this; return this;
} }


@Override @Override
public CacheQuery<E> sort(Sort sort) { public CacheQuery<E> sort(Sort sort) {
this.sort = sort; this.queryDefinition.setSort(sort);
return super.sort(sort); return super.sort(sort);
} }


@Override @Override
public int getResultSize() { public int getResultSize() {
int accumulator; int accumulator;
if (resultSize == null) { if (resultSize == null) {
ClusteredQueryCommand command = ClusteredQueryCommand.getResultSize(hSearchQuery, cache); ClusteredQueryCommand command = ClusteredQueryCommand.getResultSize(queryDefinition, cache);


ClusteredQueryInvoker invoker = new ClusteredQueryInvoker(cache, asyncExecutor); ClusteredQueryInvoker invoker = new ClusteredQueryInvoker(cache, asyncExecutor);
List<QueryResponse> responses = invoker.broadcast(command); List<QueryResponse> responses = invoker.broadcast(command);
Expand All @@ -76,32 +83,32 @@ public int getResultSize() {
for (QueryResponse response : responses) { for (QueryResponse response : responses) {
accumulator += response.getResultSize(); accumulator += response.getResultSize();
} }
resultSize = Integer.valueOf(accumulator); resultSize = accumulator;
} else { } else {
accumulator = resultSize.intValue(); accumulator = resultSize;
} }
return accumulator; return accumulator;
} }


@Override @Override
public ResultIterator<E> iterator(FetchOptions fetchOptions) throws SearchException { public ResultIterator<E> iterator(FetchOptions fetchOptions) throws SearchException {
hSearchQuery.maxResults(getNodeMaxResults()); queryDefinition.setMaxResults(getNodeMaxResults());
switch (fetchOptions.getFetchMode()) { switch (fetchOptions.getFetchMode()) {
case EAGER: { case EAGER: {
ClusteredQueryCommand command = ClusteredQueryCommand.createEagerIterator(hSearchQuery, cache); ClusteredQueryCommand command = ClusteredQueryCommand.createEagerIterator(queryDefinition, cache);
HashMap<UUID, ClusteredTopDocs> topDocsResponses = broadcastQuery(command); HashMap<UUID, ClusteredTopDocs> topDocsResponses = broadcastQuery(command);


return new DistributedIterator<>(sort, return new DistributedIterator<>(queryDefinition.getSort(),
fetchOptions.getFetchSize(), this.resultSize, maxResults, fetchOptions.getFetchSize(), this.resultSize, maxResults,
firstResult, topDocsResponses, cache); firstResult, topDocsResponses, cache);
} }
case LAZY: { case LAZY: {
UUID lazyItId = UUID.randomUUID(); UUID lazyItId = UUID.randomUUID();
ClusteredQueryCommand command = ClusteredQueryCommand.createLazyIterator(hSearchQuery, cache, lazyItId); ClusteredQueryCommand command = ClusteredQueryCommand.createLazyIterator(queryDefinition, cache, lazyItId);
HashMap<UUID, ClusteredTopDocs> topDocsResponses = broadcastQuery(command); HashMap<UUID, ClusteredTopDocs> topDocsResponses = broadcastQuery(command);


// Make a sort copy to avoid reversed results // Make a sort copy to avoid reversed results
return new DistributedLazyIterator<>(sort, return new DistributedLazyIterator<>(queryDefinition.getSort(),
fetchOptions.getFetchSize(), this.resultSize, maxResults, fetchOptions.getFetchSize(), this.resultSize, maxResults,
firstResult, lazyItId, topDocsResponses, asyncExecutor, cache); firstResult, lazyItId, topDocsResponses, asyncExecutor, cache);
} }
Expand Down
Expand Up @@ -6,11 +6,11 @@
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


import org.hibernate.search.query.engine.spi.HSQuery;
import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.BaseRpcCommand; import org.infinispan.commands.remote.BaseRpcCommand;
import org.infinispan.commons.marshall.MarshallUtil; import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.query.QueryDefinition;
import org.infinispan.query.clustered.commandworkers.ClusteredQueryCommandWorker; import org.infinispan.query.clustered.commandworkers.ClusteredQueryCommandWorker;
import org.infinispan.query.impl.CommandInitializer; import org.infinispan.query.impl.CommandInitializer;
import org.infinispan.query.impl.CustomQueryCommand; import org.infinispan.query.impl.CustomQueryCommand;
Expand All @@ -30,7 +30,7 @@ public class ClusteredQueryCommand extends BaseRpcCommand implements ReplicableC


private ClusteredQueryCommandType commandType; private ClusteredQueryCommandType commandType;


private HSQuery query; private QueryDefinition queryDefinition;


// local instance (set only when command arrives on target node) // local instance (set only when command arrives on target node)
private Cache<?, ?> cache; private Cache<?, ?> cache;
Expand Down Expand Up @@ -59,22 +59,22 @@ public void fetchExecutionContext(CommandInitializer ci) {
this.cache = ci.getCacheManager().getCache(cacheName.toString()); this.cache = ci.getCacheManager().getCache(cacheName.toString());
} }


public static ClusteredQueryCommand createLazyIterator(HSQuery query, Cache<?, ?> cache, UUID id) { public static ClusteredQueryCommand createLazyIterator(QueryDefinition queryDefinition, Cache<?, ?> cache, UUID id) {
ClusteredQueryCommand clQuery = new ClusteredQueryCommand(ClusteredQueryCommandType.CREATE_LAZY_ITERATOR, cache.getName()); ClusteredQueryCommand clQuery = new ClusteredQueryCommand(ClusteredQueryCommandType.CREATE_LAZY_ITERATOR, cache.getName());
clQuery.query = query; clQuery.queryDefinition = queryDefinition;
clQuery.lazyQueryId = id; clQuery.lazyQueryId = id;
return clQuery; return clQuery;
} }


public static ClusteredQueryCommand getResultSize(HSQuery query, Cache<?, ?> cache) { public static ClusteredQueryCommand getResultSize(QueryDefinition queryDefinition, Cache<?, ?> cache) {
ClusteredQueryCommand clQuery = new ClusteredQueryCommand(ClusteredQueryCommandType.GET_RESULT_SIZE, cache.getName()); ClusteredQueryCommand clQuery = new ClusteredQueryCommand(ClusteredQueryCommandType.GET_RESULT_SIZE, cache.getName());
clQuery.query = query; clQuery.queryDefinition = queryDefinition;
return clQuery; return clQuery;
} }


public static ClusteredQueryCommand createEagerIterator(HSQuery query, Cache<?, ?> cache) { public static ClusteredQueryCommand createEagerIterator(QueryDefinition queryDefinition, Cache<?, ?> cache) {
ClusteredQueryCommand clQuery = new ClusteredQueryCommand(ClusteredQueryCommandType.CREATE_EAGER_ITERATOR, cache.getName()); ClusteredQueryCommand clQuery = new ClusteredQueryCommand(ClusteredQueryCommandType.CREATE_EAGER_ITERATOR, cache.getName());
clQuery.query = query; clQuery.queryDefinition = queryDefinition;
return clQuery; return clQuery;
} }


Expand Down Expand Up @@ -110,7 +110,7 @@ public CompletableFuture<Object> invokeAsync() throws Throwable {
} }


public QueryResponse perform(Cache<?, ?> cache) { public QueryResponse perform(Cache<?, ?> cache) {
ClusteredQueryCommandWorker worker = commandType.getCommand(cache, query, lazyQueryId, docIndex); ClusteredQueryCommandWorker worker = commandType.getCommand(cache, queryDefinition, lazyQueryId, docIndex);
return worker.perform(); return worker.perform();
} }


Expand All @@ -122,15 +122,15 @@ public byte getCommandId() {
@Override @Override
public void writeTo(ObjectOutput output) throws IOException { public void writeTo(ObjectOutput output) throws IOException {
MarshallUtil.marshallEnum(commandType, output); MarshallUtil.marshallEnum(commandType, output);
output.writeObject(query); output.writeObject(queryDefinition);
MarshallUtil.marshallUUID(lazyQueryId, output, true); MarshallUtil.marshallUUID(lazyQueryId, output, true);
output.writeInt(docIndex); output.writeInt(docIndex);
} }


@Override @Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
commandType = MarshallUtil.unmarshallEnum(input, ClusteredQueryCommandType::valueOf); commandType = MarshallUtil.unmarshallEnum(input, ClusteredQueryCommandType::valueOf);
query = (HSQuery) input.readObject(); queryDefinition = (QueryDefinition) input.readObject();
lazyQueryId = MarshallUtil.unmarshallUUID(input, true); lazyQueryId = MarshallUtil.unmarshallUUID(input, true);
docIndex = input.readInt(); docIndex = input.readInt();
} }
Expand All @@ -145,7 +145,7 @@ public int hashCode() {
final int prime = 31; final int prime = 31;
int result = 1; int result = 1;
result = prime * result + ((cacheName == null) ? 0 : cacheName.hashCode()); result = prime * result + ((cacheName == null) ? 0 : cacheName.hashCode());
result = prime * result + ((query == null) ? 0 : query.hashCode()); result = prime * result + ((queryDefinition == null) ? 0 : queryDefinition.hashCode());
return result; return result;
} }


Expand All @@ -163,10 +163,10 @@ public boolean equals(Object obj) {
return false; return false;
} else if (!cacheName.equals(other.cacheName)) } else if (!cacheName.equals(other.cacheName))
return false; return false;
if (query == null) { if (queryDefinition == null) {
if (other.query != null) if (other.queryDefinition != null)
return false; return false;
} else if (!query.equals(other.query)) } else if (!queryDefinition.equals(other.queryDefinition))
return false; return false;
return true; return true;
} }
Expand Down

0 comments on commit 9d82d98

Please sign in to comment.