Skip to content

Commit

Permalink
Support nested fields in Elasticsearch Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenxiao committed Jul 12, 2019
1 parent 6b8b336 commit 08c0a30
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 13 deletions.
Expand Up @@ -25,8 +25,10 @@
import org.elasticsearch.search.SearchHit;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -250,11 +252,86 @@ private Object getFieldValue(int field)

private void extractFromSource(SearchHit hit)
{
Map<String, Object> map = hit.getSourceAsMap();
for (Map.Entry<String, Object> entry : map.entrySet()) {
String jsonPath = entry.getKey();
Object entryValue = entry.getValue();
setFieldIfExists(jsonPath, entryValue);
List<Field> fields = new ArrayList<>();
for (Map.Entry<String, Object> entry : hit.getSourceAsMap().entrySet()) {
fields.add(new Field(entry.getKey(), entry.getValue()));
}
Collections.sort(fields, Comparator.comparing(Field::getName));

for (Map.Entry<String, Object> entry : unflatten(fields).entrySet()) {
setFieldIfExists(entry.getKey(), entry.getValue());
}
}

private static Map<String, Object> unflatten(List<Field> fields)
{
return unflatten(fields, 0, 0, fields.size());
}

private static Map<String, Object> unflatten(List<Field> fields, int level, int start, int length)
{
checkArgument(length > 0, "length must be > 0");

int limit = start + length;

Map<String, Object> result = new HashMap<>();
int anchor = start;
int current = start;

do {
Field field = fields.get(anchor);
String name = field.getPathElement(level);

current++;
if (current == limit || !name.equals(fields.get(current).getPathElement(level))) {
// We assume that fields can't be both leaves and intermediate nodes
Object value;
if (level < field.getDepth() - 1) {
value = unflatten(fields, level + 1, anchor, current - anchor);
}
else {
value = field.getValue();
}
result.put(name, value);
anchor = current;
}
}
while (current < limit);

return result;
}

private static final class Field
{
private final String name;
private final List<String> path;
private final Object value;

public Field(String name, Object value)
{
this.name = name;
this.path = Arrays.asList(name.split("\\."));
this.value = value;
}

public String getName()
{
return name;
}

public int getDepth()
{
return path.size();
}

public String getPathElement(int level)
{
return path.get(level);
}

public Object getValue()
{
return value;
}
}
}
Expand Up @@ -46,23 +46,29 @@ public static Block serializeObject(Type type, BlockBuilder builder, Object obje
if (MAP.equals(type.getTypeSignature().getBase()) || ARRAY.equals(type.getTypeSignature().getBase())) {
throw new IllegalArgumentException("Type not supported: " + type.getDisplayName());
}
return serializePrimitive(type, builder, object);
serializePrimitive(type, builder, object);
return null;
}

private static Block serializeStruct(Type type, BlockBuilder builder, Object object)
{
if (object == null) {
requireNonNull(builder, "builder is null");
builder.appendNull();
return builder.build();
return null;
}

List<Type> typeParameters = type.getTypeParameters();

BlockBuilder currentBuilder;

boolean builderSynthesized = false;
if (builder == null) {
builderSynthesized = true;
builder = type.createBlockBuilder(null, 1);
}

BlockBuilder currentBuilder = builder.beginBlockEntry();
List<Type> typeParameters = type.getTypeParameters();
currentBuilder = builder.beginBlockEntry();

for (int i = 0; i < typeParameters.size(); i++) {
Optional<String> fieldName = type.getTypeSignature().getParameters().get(i).getNamedTypeSignature().getName();
Expand All @@ -75,16 +81,21 @@ private static Block serializeStruct(Type type, BlockBuilder builder, Object obj
}

builder.closeEntry();
return (Block) type.getObject(builder, 0);
if (builderSynthesized) {
return (Block) type.getObject(builder, 0);
}
else {
return null;
}
}

private static Block serializePrimitive(Type type, BlockBuilder builder, Object object)
private static void serializePrimitive(Type type, BlockBuilder builder, Object object)
{
requireNonNull(builder, "builder is null");

if (object == null) {
builder.appendNull();
return builder.build();
return;
}

if (type.equals(BOOLEAN)) {
Expand All @@ -105,6 +116,6 @@ else if (type.equals(VARCHAR) || type.equals(VARBINARY)) {
else {
throw new IllegalArgumentException("Unknown primitive type: " + type.getDisplayName());
}
return builder.build();
return;
}
}
Expand Up @@ -120,4 +120,28 @@ public void testMixedCaseFields()
"SELECT name, age FROM test.person",
"VALUES ('John', 20)");
}

@Test
public void testNestedFields()
{
String indexName = "data";
embeddedElasticsearchNode.getClient()
.prepareIndex(indexName, "doc")
.setSource(ImmutableMap.<String, Object>builder()
.put("name", "nestfield")
.put("fields.fielda", 32)
.put("fields.fieldb", "valueb")
.build())
.get();

embeddedElasticsearchNode.getClient()
.admin()
.indices()
.refresh(refreshRequest(indexName))
.actionGet();

assertQuery(
"SELECT name, fields.fielda, fields.fieldb FROM nested.data",
"VALUES ('nestfield', 32, 'valueb')");
}
}
@@ -0,0 +1,25 @@
{
"tableName": "data",
"schemaName": "nested",
"host": "localhost",
"port": "9300",
"clusterName": "test",
"index": "data",
"type": "doc",
"columns": [
{
"name": "name",
"type": "varchar",
"jsonPath": "name",
"jsonType": "varchar",
"ordinalPosition": "0"
},
{
"name": "fields",
"type": "row(fielda integer, fieldb varchar)",
"jsonPath": "fields",
"jsonType": "varchar",
"ordinalPosition": "1"
}
]
}

0 comments on commit 08c0a30

Please sign in to comment.