diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java index 9d53f7b40cc6f..49f89bc2b08f2 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java @@ -25,8 +25,10 @@ import org.elasticsearch.search.SearchHit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -250,11 +252,86 @@ private Object getFieldValue(int field) private void extractFromSource(SearchHit hit) { - Map map = hit.getSourceAsMap(); - for (Map.Entry entry : map.entrySet()) { - String jsonPath = entry.getKey(); - Object entryValue = entry.getValue(); - setFieldIfExists(jsonPath, entryValue); + List fields = new ArrayList<>(); + for (Map.Entry entry : hit.getSourceAsMap().entrySet()) { + fields.add(new Field(entry.getKey(), entry.getValue())); + } + Collections.sort(fields, Comparator.comparing(Field::getName)); + + for (Map.Entry entry : unflatten(fields).entrySet()) { + setFieldIfExists(entry.getKey(), entry.getValue()); + } + } + + private static Map unflatten(List fields) + { + return unflatten(fields, 0, 0, fields.size()); + } + + private static Map unflatten(List fields, int level, int start, int length) + { + checkArgument(length > 0, "length must be > 0"); + + int limit = start + length; + + Map result = new HashMap<>(); + int anchor = start; + int current = start; + + do { + Field field = fields.get(anchor); + String name = field.getPathElement(level); + + current++; + if (current == limit || !name.equals(fields.get(current).getPathElement(level))) { + // We assume that fields can't be both leaves and intermediate nodes + Object value; + if (level < field.getDepth() - 1) { + value = unflatten(fields, level + 1, anchor, current - anchor); + } + else { + value = field.getValue(); + } + result.put(name, value); + anchor = current; + } + } + while (current < limit); + + return result; + } + + private static final class Field + { + private final String name; + private final List path; + private final Object value; + + public Field(String name, Object value) + { + this.name = name; + this.path = Arrays.asList(name.split("\\.")); + this.value = value; + } + + public String getName() + { + return name; + } + + public int getDepth() + { + return path.size(); + } + + public String getPathElement(int level) + { + return path.get(level); + } + + public Object getValue() + { + return value; } } } diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchUtils.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchUtils.java index 75802d56998d4..078e94292ca6e 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchUtils.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchUtils.java @@ -46,7 +46,8 @@ public static Block serializeObject(Type type, BlockBuilder builder, Object obje if (MAP.equals(type.getTypeSignature().getBase()) || ARRAY.equals(type.getTypeSignature().getBase())) { throw new IllegalArgumentException("Type not supported: " + type.getDisplayName()); } - return serializePrimitive(type, builder, object); + serializePrimitive(type, builder, object); + return null; } private static Block serializeStruct(Type type, BlockBuilder builder, Object object) @@ -54,15 +55,20 @@ private static Block serializeStruct(Type type, BlockBuilder builder, Object obj if (object == null) { requireNonNull(builder, "builder is null"); builder.appendNull(); - return builder.build(); + return null; } + List typeParameters = type.getTypeParameters(); + + BlockBuilder currentBuilder; + + boolean builderSynthesized = false; if (builder == null) { + builderSynthesized = true; builder = type.createBlockBuilder(null, 1); } - BlockBuilder currentBuilder = builder.beginBlockEntry(); - List typeParameters = type.getTypeParameters(); + currentBuilder = builder.beginBlockEntry(); for (int i = 0; i < typeParameters.size(); i++) { Optional fieldName = type.getTypeSignature().getParameters().get(i).getNamedTypeSignature().getName(); @@ -75,16 +81,21 @@ private static Block serializeStruct(Type type, BlockBuilder builder, Object obj } builder.closeEntry(); - return (Block) type.getObject(builder, 0); + if (builderSynthesized) { + return (Block) type.getObject(builder, 0); + } + else { + return null; + } } - private static Block serializePrimitive(Type type, BlockBuilder builder, Object object) + private static void serializePrimitive(Type type, BlockBuilder builder, Object object) { requireNonNull(builder, "builder is null"); if (object == null) { builder.appendNull(); - return builder.build(); + return; } if (type.equals(BOOLEAN)) { @@ -105,6 +116,6 @@ else if (type.equals(VARCHAR) || type.equals(VARBINARY)) { else { throw new IllegalArgumentException("Unknown primitive type: " + type.getDisplayName()); } - return builder.build(); + return; } } diff --git a/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java b/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java index dade1d9fb1a6d..5c49c25d7875d 100644 --- a/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java +++ b/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java @@ -120,4 +120,28 @@ public void testMixedCaseFields() "SELECT name, age FROM test.person", "VALUES ('John', 20)"); } + + @Test + public void testNestedFields() + { + String indexName = "data"; + embeddedElasticsearchNode.getClient() + .prepareIndex(indexName, "doc") + .setSource(ImmutableMap.builder() + .put("name", "nestfield") + .put("fields.fielda", 32) + .put("fields.fieldb", "valueb") + .build()) + .get(); + + embeddedElasticsearchNode.getClient() + .admin() + .indices() + .refresh(refreshRequest(indexName)) + .actionGet(); + + assertQuery( + "SELECT name, fields.fielda, fields.fieldb FROM nested.data", + "VALUES ('nestfield', 32, 'valueb')"); + } } diff --git a/presto-elasticsearch/src/test/resources/queryrunner/nested.data.json b/presto-elasticsearch/src/test/resources/queryrunner/nested.data.json new file mode 100644 index 0000000000000..7cee724418005 --- /dev/null +++ b/presto-elasticsearch/src/test/resources/queryrunner/nested.data.json @@ -0,0 +1,25 @@ +{ + "tableName": "data", + "schemaName": "nested", + "host": "localhost", + "port": "9300", + "clusterName": "test", + "index": "data", + "type": "doc", + "columns": [ + { + "name": "name", + "type": "varchar", + "jsonPath": "name", + "jsonType": "varchar", + "ordinalPosition": "0" + }, + { + "name": "fields", + "type": "row(fielda integer, fieldb varchar)", + "jsonPath": "fields", + "jsonType": "varchar", + "ordinalPosition": "1" + } + ] +}