Skip to content

Commit

Permalink
Fix #128 fetch multiple CQL types
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Royer committed Oct 14, 2017
1 parent b1dd40d commit 819e4f1
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ public class FieldsVisitor extends StoredFieldVisitor {
protected BytesReference source;
protected String type, id;
protected Map<String, List<Object>> fieldsValues;

protected NavigableSet<String> requiredColumns = null;


public FieldsVisitor(boolean loadSource) {
this.loadSource = loadSource;
requiredFields = new HashSet<>();
Expand All @@ -97,29 +95,25 @@ public Set<String> requestedFields() {
return ImmutableSet.of();
}

// cache the cassandra required columns and return the static+partition columns
public NavigableSet<String> requiredColumns(ClusterService clusterService, SearchContext searchContext) throws IOException {
if (this.requiredColumns == null) {
List<String> requiredColumns = new ArrayList<String>();
if (requestedFields() != null) {
for(String fieldExp : requestedFields()) {
for(String field : searchContext.mapperService().simpleMatchToIndexNames(fieldExp)) {
int i = field.indexOf('.');
String columnName = (i > 0) ? field.substring(0, i) : field;
// TODO: eliminate non-existant columns or (non-static or non-partition-key) for static docs.
if (this.filtredFields == null || this.filtredFields.contains(columnName))
requiredColumns.add(columnName);
}
}
}
if (loadSource()) {
for(String columnName : searchContext.mapperService().documentMapper(type).getColumnDefinitions().keySet())
List<String> requiredColumns = new ArrayList<String>();
if (requestedFields() != null) {
for(String fieldExp : requestedFields()) {
for(String field : searchContext.mapperService().simpleMatchToIndexNames(fieldExp)) {
int i = field.indexOf('.');
String columnName = (i > 0) ? field.substring(0, i) : field;
// TODO: eliminate non-existant columns or (non-static or non-partition-key) for static docs.
if (this.filtredFields == null || this.filtredFields.contains(columnName))
requiredColumns.add( columnName );
requiredColumns.add(columnName);
}
}
this.requiredColumns = new TreeSet<String>(requiredColumns);
}
return this.requiredColumns;
if (loadSource()) {
for(String columnName : searchContext.mapperService().documentMapper(type).getColumnDefinitions().keySet())
if (this.filtredFields == null || this.filtredFields.contains(columnName))
requiredColumns.add( columnName );
}
return new TreeSet<String>(requiredColumns);
}

public void filtredColumns(Collection<String> fields) {
Expand Down
13 changes: 0 additions & 13 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
@Nullable
private final RefreshListeners refreshListeners;

private ConcurrentMap<Pair<String, Set<String>>, ParsedStatement.Prepared> cqlStatementCache =
//new ConcurrentHashMap<Pair<String, Set<String>>, ParsedStatement.Prepared>();
new ConcurrentReferenceHashMap<Pair<String, Set<String>>, ParsedStatement.Prepared>(4, 0.9f, 1, ReferenceType.WEAK);

public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
Expand Down Expand Up @@ -339,15 +335,6 @@ public ShardBitsetFilterCache shardBitsetFilterCache() {
public ShardBitsetFilterCache tokenRangesBitsetFilterCache() {
return tokenRangesBitsetFilterCache;
}


public ParsedStatement.Prepared getCqlPreparedStatement(Pair<String, Set<String>> key) {
return cqlStatementCache.get(key);
}

public void putCqlPreparedStatement(Pair<String, Set<String>> key, ParsedStatement.Prepared query) {
cqlStatementCache.put(key, query);
}

public IndexFieldDataService indexFieldDataService() {
return indexFieldDataService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.Pair;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -372,7 +372,6 @@ private void loadStoredFields(SearchContext searchContext, LeafReaderContext rea
throw new FetchPhaseExecutionException(searchContext, "Failed to fetch doc id [" + docId + "]", e);
}


// load field from cassandra
IndexService indexService = searchContext.indexShard().indexService();
try {
Expand All @@ -381,10 +380,9 @@ private void loadStoredFields(SearchContext searchContext, LeafReaderContext rea
if (docPk.isStaticDocument)
typeKey += "_static";

NavigableSet<String> requiredColumns = fieldVisitor.requiredColumns(clusterService, searchContext);
Pair<String, Set<String>> statementKey = Pair.<String, Set<String>>create(typeKey,requiredColumns);
ParsedStatement.Prepared cqlStatement = searchContext.indexShard().getCqlPreparedStatement( statementKey );
ParsedStatement.Prepared cqlStatement = searchContext.getCqlPreparedStatement( typeKey );
if (cqlStatement == null) {
NavigableSet<String> requiredColumns = fieldVisitor.requiredColumns(clusterService, searchContext);
if (requiredColumns.size() > 0) {
IndexMetaData indexMetaData = clusterService.state().metaData().index(searchContext.request().shardId().getIndexName());
if (requiredColumns.contains(NodeFieldMapper.NAME)) {
Expand All @@ -399,8 +397,11 @@ private void loadStoredFields(SearchContext searchContext, LeafReaderContext rea
String query = clusterService.buildFetchQuery(
indexService, fieldVisitor.uid().type(),
requiredColumns.toArray(new String[requiredColumns.size()]), docPk.isStaticDocument, docMapper.getColumnDefinitions());
Logger logger = Loggers.getLogger(FetchPhase.class);
if (logger.isTraceEnabled())
logger.trace("new statement={}",query);
cqlStatement = QueryProcessor.prepareInternal(query);
searchContext.indexShard().putCqlPreparedStatement(statementKey, cqlStatement);
searchContext.putCqlPreparedStatement(typeKey, cqlStatement);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.search.internal;


import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -69,6 +70,8 @@
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -102,6 +105,7 @@ protected SearchContext() {
protected ClusterState clusterState = null;
protected SearchProcessor processor = null;
protected boolean includeNode;
protected ConcurrentMap<String, ParsedStatement.Prepared> cqlStatementCache = new ConcurrentHashMap<String, ParsedStatement.Prepared>();

@Override
public final void close() {
Expand Down Expand Up @@ -156,6 +160,15 @@ public SearchProcessor searchProcessor() {
return processor;
}


public ParsedStatement.Prepared getCqlPreparedStatement(String key) {
return cqlStatementCache.get(key);
}

public void putCqlPreparedStatement(String key, ParsedStatement.Prepared query) {
cqlStatementCache.put(key, query);
}

/** Automatically apply all required filters to the given query such as
* alias filters, types filters, etc. */
public abstract Query buildFilteredQuery(Query query);
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/org/elassandra/CqlTypesTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,18 @@ public void testTimes() throws Exception {
SearchResponse resp = client().prepareSearch().setIndices("test").setTypes("event_test").setQuery(QueryBuilders.queryStringQuery("day:2010-10-10")).get();
assertThat(resp.getHits().getTotalHits(), equalTo(1L));
}

// see issue #128
@Test
public void testFetchMultipleTypes() throws Exception {
createIndex("test");
ensureGreen("test");

assertThat(client().prepareIndex("test", "typeA", "1").setSource("{ \"a\":\"1\", \"x\":\"aaa\" }").get().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(client().prepareIndex("test", "typeB", "2").setSource("{ \"b\":\"1\", \"x\":\"aaa\" }").get().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(client().prepareIndex("test", "typeC", "3").setSource("{ \"c\":\"1\", \"x\":\"aaa\" }").get().getResult(), equalTo(DocWriteResponse.Result.CREATED));
SearchResponse resp = client().prepareSearch().setIndices("test").setQuery(QueryBuilders.queryStringQuery("q=aaa")).get();
assertThat(resp.getHits().getTotalHits(), equalTo(3L));
}
}

0 comments on commit 819e4f1

Please sign in to comment.