Skip to content

Commit

Permalink
relase 0.4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
vroyer committed Nov 16, 2015
1 parent 39c2e22 commit ef881ab
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 31 deletions.
Expand Up @@ -156,7 +156,7 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha
try {
UntypedResultSet result = elasticSchemaService.fetchRow(request.index(), item.type(), columns, item.id());
if (!result.isEmpty()) {
Map<String, Object> rowAsMap = elasticSchemaService.rowAsMap(result.one());
Map<String, Object> rowAsMap = elasticSchemaService.rowAsMap(request.index(), item.type(), result.one());
Map<String, GetField> rowAsFieldMap = elasticSchemaService.flattenGetField(item.fields(), "", rowAsMap, new HashMap<String, GetField>());

GetResult getResult = new GetResult(request.index(), item.type(), item.id(), 0L, true, new BytesArray(FBUtilities.json(rowAsMap).getBytes("UTF-8")), rowAsFieldMap);
Expand Down
33 changes: 19 additions & 14 deletions src/main/java/org/elasticsearch/cassandra/ElasticSchemaService.java
Expand Up @@ -98,7 +98,6 @@
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
Expand Down Expand Up @@ -154,6 +153,7 @@ public class ElasticSchemaService extends AbstractComponent implements SchemaSer
public static String MAPPING_UPDATE_TIMEOUT = "cassandra.mapping_update.timeout";

private final ClusterService clusterService;
private final IndicesService indicesService;
private final TimeValue mappingUpdateTimeout;

// ElasticSearch to Cassandra mapping
Expand Down Expand Up @@ -209,9 +209,10 @@ public boolean isNativeCql3Type(String cqlType) {
}

@Inject
public ElasticSchemaService(Settings settings, ClusterService clusterService) {
public ElasticSchemaService(Settings settings, ClusterService clusterService, IndicesService indicesService) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.mappingUpdateTimeout = settings.getAsTime(MAPPING_UPDATE_TIMEOUT, TimeValue.timeValueSeconds(30));
}

Expand Down Expand Up @@ -995,17 +996,20 @@ public void deleteRow(final String index, final String type, final String id, fi
}

@Override
public Map<String, Object> rowAsMap(UntypedResultSet.Row row) throws IOException {
public Map<String, Object> rowAsMap(final String index, final String type, UntypedResultSet.Row row) throws IOException {
Map<String, Object> mapObject = new HashMap<String, Object>();
rowAsMap(row, mapObject);
rowAsMap(index, type, row, mapObject);
return mapObject;
}

@Override
public int rowAsMap(UntypedResultSet.Row row, Map<String, Object> mapObject) throws IOException {
public int rowAsMap(final String index, final String type, UntypedResultSet.Row row, Map<String, Object> mapObject) throws IOException {
int putCount = 0;
List<ColumnSpecification> columnSpecs = row.getColumns();


IndexService indexService = indicesService.indexServiceSafe(index);
DocumentMapper documentMapper = indexService.mapperService().documentMapper(type);

for (int columnIndex = 0; columnIndex < columnSpecs.size(); columnIndex++) {
ColumnSpecification colSpec = columnSpecs.get(columnIndex);
String columnName = colSpec.name.toString();
Expand All @@ -1015,6 +1019,7 @@ public int rowAsMap(UntypedResultSet.Row row, Map<String, Object> mapObject) thr
continue;

if (cql3Type instanceof CQL3Type.Native) {
FieldMapper<?> fieldMapper = documentMapper.mappers().smartNameFieldMapper(columnName);
switch ((CQL3Type.Native) cql3Type) {
case ASCII:
case TEXT:
Expand All @@ -1024,38 +1029,38 @@ public int rowAsMap(UntypedResultSet.Row row, Map<String, Object> mapObject) thr
break;
case TIMEUUID:
case UUID:
mapObject.put(columnName, row.getUUID(colSpec.name.toString()));
mapObject.put(columnName, row.getUUID(colSpec.name.toString()).toString() );
putCount++;
break;
case TIMESTAMP:
mapObject.put(columnName, row.getTimestamp(colSpec.name.toString()).getTime());
mapObject.put(columnName, fieldMapper.valueForSearch( row.getTimestamp(colSpec.name.toString()).getTime()));
putCount++;
break;
case INT:
mapObject.put(columnName, row.getInt(colSpec.name.toString()));
mapObject.put(columnName, fieldMapper.valueForSearch( row.getInt(colSpec.name.toString())) );
putCount++;
break;
case BIGINT:
mapObject.put(columnName, row.getLong(colSpec.name.toString()));
mapObject.put(columnName, fieldMapper.valueForSearch( row.getLong(colSpec.name.toString())) );
putCount++;
break;
case DECIMAL:
case DOUBLE:
mapObject.put(columnName, row.getDouble(colSpec.name.toString()));
mapObject.put(columnName, fieldMapper.valueForSearch( row.getDouble(colSpec.name.toString())));
putCount++;
break;
case BLOB:
mapObject.put(columnName, row.getBytes(colSpec.name.toString()));
mapObject.put(columnName, fieldMapper.valueForSearch( row.getBytes(colSpec.name.toString())));
putCount++;
break;
case BOOLEAN:
mapObject.put(columnName, row.getBoolean(colSpec.name.toString()));
mapObject.put(columnName, fieldMapper.valueForSearch( row.getBoolean(colSpec.name.toString())));
putCount++;
break;
case COUNTER:
break;
case INET:
mapObject.put(columnName, row.getInetAddress(colSpec.name.toString()));
mapObject.put(columnName, fieldMapper.valueForSearch( row.getInetAddress(colSpec.name.toString()).getHostAddress()));
putCount++;
break;
default:
Expand Down
Expand Up @@ -83,13 +83,13 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

/**
Expand Down Expand Up @@ -324,7 +324,7 @@ public void complete() throws JsonGenerationException, JsonMappingException, IOE
logger.debug(" {}.{} id={} read fields={}",metadata.ksName, metadata.cfName, id(), mustReadColumns);
SchemaService schemaService = ElassandraDaemon.injector().getInstance(SchemaService.class);
Row row = schemaService.fetchRowInternal(metadata.ksName, metadata.cfName, mustReadColumns, pkColumns).one();
int putCount = schemaService.rowAsMap(row, docMap);
int putCount = schemaService.rowAsMap(metadata.ksName, metadata.cfName, row, docMap);
if (putCount > 0) docLive = true;
} catch (RequestValidationException | IOException e) {
logger.error("Failed to fetch columns {}",mustReadColumns,e);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/cassandra/SchemaService.java
Expand Up @@ -82,8 +82,8 @@ public UntypedResultSet fetchRow(final String index, final String type, final St
public UntypedResultSet fetchRowInternal(String index, String type, Collection<String> requiredColumns, String id) throws ConfigurationException, IOException;
public UntypedResultSet fetchRowInternal(String ksName, String cfName, Collection<String> requiredColumns, Object[] pkColumns) throws ConfigurationException, IOException;

public Map<String, Object> rowAsMap(UntypedResultSet.Row row) throws IOException;
public int rowAsMap(UntypedResultSet.Row row, Map<String, Object> map) throws IOException;
public Map<String, Object> rowAsMap(final String index, final String type, UntypedResultSet.Row row) throws IOException;
public int rowAsMap(final String index, final String type, UntypedResultSet.Row row, Map<String, Object> map) throws IOException;

public void deleteRow(String index, String type, String id, ConsistencyLevel cl) throws InvalidRequestException, RequestExecutionException, RequestValidationException, IOException;

Expand Down
Expand Up @@ -19,7 +19,16 @@

package org.elasticsearch.common.xcontent;

import com.google.common.base.Charsets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.InetAddress;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -36,14 +45,7 @@
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import com.google.common.base.Charsets;

/**
*
Expand Down Expand Up @@ -367,6 +369,16 @@ public XContentBuilder field(XContentBuilderString name, Integer value) throws I
}
return this;
}

public XContentBuilder field(XContentBuilderString name, InetAddress value) throws IOException {
field(name);
if (value == null) {
generator.writeNull();
} else {
generator.writeString(value.getHostAddress());
}
return this;
}

public XContentBuilder field(String name, int value) throws IOException {
field(name);
Expand Down
Expand Up @@ -296,7 +296,7 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real
if (result.isEmpty()) {
return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
}
sourceAsMap = schemaService.rowAsMap(result.one());
sourceAsMap = schemaService.rowAsMap(shardId.index().name(), type, result.one());
if (fetchSourceContext.fetchSource())
sourceToBeReturned = XContentFactory.contentBuilder(XContentType.JSON).map(sourceAsMap).bytes();
} catch (RequestExecutionException | RequestValidationException | IOException e1) {
Expand Down Expand Up @@ -365,7 +365,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
if (fieldVisitor != null) {
try {
// fetch source from cassandra
Map<String, Object> sourceMap = schemaService.rowAsMap(schemaService.fetchRow(shardId.index().name(), type, id).one());
Map<String, Object> sourceMap = schemaService.rowAsMap(shardId.index().name(), type, schemaService.fetchRow(shardId.index().name(), type, id).one());
source = XContentFactory.contentBuilder(XContentType.JSON).map(sourceMap).bytes();
fieldVisitor.source( source.toBytes() );
//docIdAndVersion.context.reader().document(docIdAndVersion.docId, fieldVisitor);
Expand Down
Expand Up @@ -438,7 +438,7 @@ private void loadStoredFields(SearchContext searchContext, AtomicReaderContext r
ImmutableList.<String> copyOf(fieldVisitor.cassandraColumns(searchContext.mapperService(), justUidFieldsVisitor.uid().type())),
justUidFieldsVisitor.uid().id());
if (!result.isEmpty()) {
Map<String, Object> mapObject = elasticSchemaService.rowAsMap(result.one());
Map<String, Object> mapObject = elasticSchemaService.rowAsMap(searchContext.request().index(), justUidFieldsVisitor.uid().type(), result.one());

if (fieldVisitor.needFields()) {
Map<String, List<Object>> flatMap = new HashMap<String, List<Object>>();
Expand Down

0 comments on commit ef881ab

Please sign in to comment.