From ef881ab1a3d97b538024386d63871638f37ac124 Mon Sep 17 00:00:00 2001 From: vroyer Date: Mon, 16 Nov 2015 23:11:50 +0100 Subject: [PATCH] relase 0.4.1 --- .../get/TransportShardMultiGetAction.java | 2 +- .../cassandra/ElasticSchemaService.java | 33 +++++++++++-------- .../cassandra/ElasticSecondaryIndex.java | 4 +-- .../cassandra/SchemaService.java | 4 +-- .../common/xcontent/XContentBuilder.java | 30 ++++++++++++----- .../index/get/ShardGetService.java | 4 +-- .../search/fetch/FetchPhase.java | 2 +- 7 files changed, 48 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 7ffc878fdf3..917d6218ea1 100644 --- a/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -156,7 +156,7 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha try { UntypedResultSet result = elasticSchemaService.fetchRow(request.index(), item.type(), columns, item.id()); if (!result.isEmpty()) { - Map rowAsMap = elasticSchemaService.rowAsMap(result.one()); + Map rowAsMap = elasticSchemaService.rowAsMap(request.index(), item.type(), result.one()); Map rowAsFieldMap = elasticSchemaService.flattenGetField(item.fields(), "", rowAsMap, new HashMap()); GetResult getResult = new GetResult(request.index(), item.type(), item.id(), 0L, true, new BytesArray(FBUtilities.json(rowAsMap).getBytes("UTF-8")), rowAsFieldMap); diff --git a/src/main/java/org/elasticsearch/cassandra/ElasticSchemaService.java b/src/main/java/org/elasticsearch/cassandra/ElasticSchemaService.java index 5b70a8219e3..78e018b1608 100644 --- a/src/main/java/org/elasticsearch/cassandra/ElasticSchemaService.java +++ b/src/main/java/org/elasticsearch/cassandra/ElasticSchemaService.java @@ -98,7 +98,6 @@ import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.ResultMessage; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -154,6 +153,7 @@ public class ElasticSchemaService extends AbstractComponent implements SchemaSer public static String MAPPING_UPDATE_TIMEOUT = "cassandra.mapping_update.timeout"; private final ClusterService clusterService; + private final IndicesService indicesService; private final TimeValue mappingUpdateTimeout; // ElasticSearch to Cassandra mapping @@ -209,9 +209,10 @@ public boolean isNativeCql3Type(String cqlType) { } @Inject - public ElasticSchemaService(Settings settings, ClusterService clusterService) { + public ElasticSchemaService(Settings settings, ClusterService clusterService, IndicesService indicesService) { super(settings); this.clusterService = clusterService; + this.indicesService = indicesService; this.mappingUpdateTimeout = settings.getAsTime(MAPPING_UPDATE_TIMEOUT, TimeValue.timeValueSeconds(30)); } @@ -995,17 +996,20 @@ public void deleteRow(final String index, final String type, final String id, fi } @Override - public Map rowAsMap(UntypedResultSet.Row row) throws IOException { + public Map rowAsMap(final String index, final String type, UntypedResultSet.Row row) throws IOException { Map mapObject = new HashMap(); - rowAsMap(row, mapObject); + rowAsMap(index, type, row, mapObject); return mapObject; } @Override - public int rowAsMap(UntypedResultSet.Row row, Map mapObject) throws IOException { + public int rowAsMap(final String index, final String type, UntypedResultSet.Row row, Map mapObject) throws IOException { int putCount = 0; List columnSpecs = row.getColumns(); - + + IndexService indexService = indicesService.indexServiceSafe(index); + DocumentMapper documentMapper = indexService.mapperService().documentMapper(type); + for (int columnIndex = 0; columnIndex < columnSpecs.size(); columnIndex++) { ColumnSpecification colSpec = columnSpecs.get(columnIndex); String columnName = colSpec.name.toString(); @@ -1015,6 +1019,7 @@ public int rowAsMap(UntypedResultSet.Row row, Map mapObject) thr continue; if (cql3Type instanceof CQL3Type.Native) { + FieldMapper fieldMapper = documentMapper.mappers().smartNameFieldMapper(columnName); switch ((CQL3Type.Native) cql3Type) { case ASCII: case TEXT: @@ -1024,38 +1029,38 @@ public int rowAsMap(UntypedResultSet.Row row, Map mapObject) thr break; case TIMEUUID: case UUID: - mapObject.put(columnName, row.getUUID(colSpec.name.toString())); + mapObject.put(columnName, row.getUUID(colSpec.name.toString()).toString() ); putCount++; break; case TIMESTAMP: - mapObject.put(columnName, row.getTimestamp(colSpec.name.toString()).getTime()); + mapObject.put(columnName, fieldMapper.valueForSearch( row.getTimestamp(colSpec.name.toString()).getTime())); putCount++; break; case INT: - mapObject.put(columnName, row.getInt(colSpec.name.toString())); + mapObject.put(columnName, fieldMapper.valueForSearch( row.getInt(colSpec.name.toString())) ); putCount++; break; case BIGINT: - mapObject.put(columnName, row.getLong(colSpec.name.toString())); + mapObject.put(columnName, fieldMapper.valueForSearch( row.getLong(colSpec.name.toString())) ); putCount++; break; case DECIMAL: case DOUBLE: - mapObject.put(columnName, row.getDouble(colSpec.name.toString())); + mapObject.put(columnName, fieldMapper.valueForSearch( row.getDouble(colSpec.name.toString()))); putCount++; break; case BLOB: - mapObject.put(columnName, row.getBytes(colSpec.name.toString())); + mapObject.put(columnName, fieldMapper.valueForSearch( row.getBytes(colSpec.name.toString()))); putCount++; break; case BOOLEAN: - mapObject.put(columnName, row.getBoolean(colSpec.name.toString())); + mapObject.put(columnName, fieldMapper.valueForSearch( row.getBoolean(colSpec.name.toString()))); putCount++; break; case COUNTER: break; case INET: - mapObject.put(columnName, row.getInetAddress(colSpec.name.toString())); + mapObject.put(columnName, fieldMapper.valueForSearch( row.getInetAddress(colSpec.name.toString()).getHostAddress())); putCount++; break; default: diff --git a/src/main/java/org/elasticsearch/cassandra/ElasticSecondaryIndex.java b/src/main/java/org/elasticsearch/cassandra/ElasticSecondaryIndex.java index 7acdfee12a9..798bf9b0538 100644 --- a/src/main/java/org/elasticsearch/cassandra/ElasticSecondaryIndex.java +++ b/src/main/java/org/elasticsearch/cassandra/ElasticSecondaryIndex.java @@ -83,13 +83,13 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; /** @@ -324,7 +324,7 @@ public void complete() throws JsonGenerationException, JsonMappingException, IOE logger.debug(" {}.{} id={} read fields={}",metadata.ksName, metadata.cfName, id(), mustReadColumns); SchemaService schemaService = ElassandraDaemon.injector().getInstance(SchemaService.class); Row row = schemaService.fetchRowInternal(metadata.ksName, metadata.cfName, mustReadColumns, pkColumns).one(); - int putCount = schemaService.rowAsMap(row, docMap); + int putCount = schemaService.rowAsMap(metadata.ksName, metadata.cfName, row, docMap); if (putCount > 0) docLive = true; } catch (RequestValidationException | IOException e) { logger.error("Failed to fetch columns {}",mustReadColumns,e); diff --git a/src/main/java/org/elasticsearch/cassandra/SchemaService.java b/src/main/java/org/elasticsearch/cassandra/SchemaService.java index c392a2af4f5..b2740b577a8 100644 --- a/src/main/java/org/elasticsearch/cassandra/SchemaService.java +++ b/src/main/java/org/elasticsearch/cassandra/SchemaService.java @@ -82,8 +82,8 @@ public UntypedResultSet fetchRow(final String index, final String type, final St public UntypedResultSet fetchRowInternal(String index, String type, Collection requiredColumns, String id) throws ConfigurationException, IOException; public UntypedResultSet fetchRowInternal(String ksName, String cfName, Collection requiredColumns, Object[] pkColumns) throws ConfigurationException, IOException; - public Map rowAsMap(UntypedResultSet.Row row) throws IOException; - public int rowAsMap(UntypedResultSet.Row row, Map map) throws IOException; + public Map rowAsMap(final String index, final String type, UntypedResultSet.Row row) throws IOException; + public int rowAsMap(final String index, final String type, UntypedResultSet.Row row, Map map) throws IOException; public void deleteRow(String index, String type, String id, ConsistencyLevel cl) throws InvalidRequestException, RequestExecutionException, RequestValidationException, IOException; diff --git a/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java b/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java index 02e0fe1266f..bebe90dc5e4 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java +++ b/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java @@ -19,7 +19,16 @@ package org.elasticsearch.common.xcontent; -import com.google.common.base.Charsets; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.net.InetAddress; +import java.util.Calendar; +import java.util.Date; +import java.util.Map; + import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; @@ -36,14 +45,7 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.util.Calendar; -import java.util.Date; -import java.util.Map; +import com.google.common.base.Charsets; /** * @@ -367,6 +369,16 @@ public XContentBuilder field(XContentBuilderString name, Integer value) throws I } return this; } + + public XContentBuilder field(XContentBuilderString name, InetAddress value) throws IOException { + field(name); + if (value == null) { + generator.writeNull(); + } else { + generator.writeString(value.getHostAddress()); + } + return this; + } public XContentBuilder field(String name, int value) throws IOException { field(name); diff --git a/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 2ec396caf15..a69d52caddb 100644 --- a/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -296,7 +296,7 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real if (result.isEmpty()) { return new GetResult(shardId.index().name(), type, id, -1, false, null, null); } - sourceAsMap = schemaService.rowAsMap(result.one()); + sourceAsMap = schemaService.rowAsMap(shardId.index().name(), type, result.one()); if (fetchSourceContext.fetchSource()) sourceToBeReturned = XContentFactory.contentBuilder(XContentType.JSON).map(sourceAsMap).bytes(); } catch (RequestExecutionException | RequestValidationException | IOException e1) { @@ -365,7 +365,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] if (fieldVisitor != null) { try { // fetch source from cassandra - Map sourceMap = schemaService.rowAsMap(schemaService.fetchRow(shardId.index().name(), type, id).one()); + Map sourceMap = schemaService.rowAsMap(shardId.index().name(), type, schemaService.fetchRow(shardId.index().name(), type, id).one()); source = XContentFactory.contentBuilder(XContentType.JSON).map(sourceMap).bytes(); fieldVisitor.source( source.toBytes() ); //docIdAndVersion.context.reader().document(docIdAndVersion.docId, fieldVisitor); diff --git a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 67e5f4eba27..da812f4f5f2 100644 --- a/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -438,7 +438,7 @@ private void loadStoredFields(SearchContext searchContext, AtomicReaderContext r ImmutableList. copyOf(fieldVisitor.cassandraColumns(searchContext.mapperService(), justUidFieldsVisitor.uid().type())), justUidFieldsVisitor.uid().id()); if (!result.isEmpty()) { - Map mapObject = elasticSchemaService.rowAsMap(result.one()); + Map mapObject = elasticSchemaService.rowAsMap(searchContext.request().index(), justUidFieldsVisitor.uid().type(), result.one()); if (fieldVisitor.needFields()) { Map> flatMap = new HashMap>();