Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested fields in Elasticsearch Connector #1001

Merged
merged 1 commit into from Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,8 +25,10 @@
import org.elasticsearch.search.SearchHit;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -250,11 +252,86 @@ private Object getFieldValue(int field)

private void extractFromSource(SearchHit hit)
{
Map<String, Object> map = hit.getSourceAsMap();
for (Map.Entry<String, Object> entry : map.entrySet()) {
String jsonPath = entry.getKey();
Object entryValue = entry.getValue();
setFieldIfExists(jsonPath, entryValue);
List<Field> fields = new ArrayList<>();
for (Map.Entry<String, Object> entry : hit.getSourceAsMap().entrySet()) {
fields.add(new Field(entry.getKey(), entry.getValue()));
}
Collections.sort(fields, Comparator.comparing(Field::getName));

for (Map.Entry<String, Object> entry : unflatten(fields).entrySet()) {
setFieldIfExists(entry.getKey(), entry.getValue());
}
}

private static Map<String, Object> unflatten(List<Field> fields)
{
return unflatten(fields, 0, 0, fields.size());
}

private static Map<String, Object> unflatten(List<Field> fields, int level, int start, int length)
{
checkArgument(length > 0, "length must be > 0");

int limit = start + length;

Map<String, Object> result = new HashMap<>();
int anchor = start;
int current = start;

do {
Field field = fields.get(anchor);
String name = field.getPathElement(level);

current++;
if (current == limit || !name.equals(fields.get(current).getPathElement(level))) {
// We assume that fields can't be both leaves and intermediate nodes
Object value;
if (level < field.getDepth() - 1) {
value = unflatten(fields, level + 1, anchor, current - anchor);
}
else {
value = field.getValue();
}
result.put(name, value);
anchor = current;
}
}
while (current < limit);

return result;
}

private static final class Field
{
private final String name;
private final List<String> path;
private final Object value;

public Field(String name, Object value)
{
this.name = name;
this.path = Arrays.asList(name.split("\\."));
this.value = value;
}

public String getName()
{
return name;
}

public int getDepth()
{
return path.size();
}

public String getPathElement(int level)
{
return path.get(level);
}

public Object getValue()
{
return value;
}
}
}
Expand Up @@ -46,23 +46,29 @@ public static Block serializeObject(Type type, BlockBuilder builder, Object obje
if (MAP.equals(type.getTypeSignature().getBase()) || ARRAY.equals(type.getTypeSignature().getBase())) {
throw new IllegalArgumentException("Type not supported: " + type.getDisplayName());
}
return serializePrimitive(type, builder, object);
serializePrimitive(type, builder, object);
return null;
}

private static Block serializeStruct(Type type, BlockBuilder builder, Object object)
{
if (object == null) {
requireNonNull(builder, "builder is null");
builder.appendNull();
return builder.build();
return null;
}

List<Type> typeParameters = type.getTypeParameters();

BlockBuilder currentBuilder;

boolean builderSynthesized = false;
if (builder == null) {
builderSynthesized = true;
builder = type.createBlockBuilder(null, 1);
}

BlockBuilder currentBuilder = builder.beginBlockEntry();
List<Type> typeParameters = type.getTypeParameters();
currentBuilder = builder.beginBlockEntry();

for (int i = 0; i < typeParameters.size(); i++) {
Optional<String> fieldName = type.getTypeSignature().getParameters().get(i).getNamedTypeSignature().getName();
Expand All @@ -75,16 +81,21 @@ private static Block serializeStruct(Type type, BlockBuilder builder, Object obj
}

builder.closeEntry();
return (Block) type.getObject(builder, 0);
if (builderSynthesized) {
return (Block) type.getObject(builder, 0);
}
else {
return null;
}
}

private static Block serializePrimitive(Type type, BlockBuilder builder, Object object)
private static void serializePrimitive(Type type, BlockBuilder builder, Object object)
{
requireNonNull(builder, "builder is null");

if (object == null) {
builder.appendNull();
return builder.build();
return;
}

if (type.equals(BOOLEAN)) {
Expand All @@ -105,6 +116,6 @@ else if (type.equals(VARCHAR) || type.equals(VARBINARY)) {
else {
throw new IllegalArgumentException("Unknown primitive type: " + type.getDisplayName());
}
return builder.build();
return;
}
}
Expand Up @@ -120,4 +120,28 @@ public void testMixedCaseFields()
"SELECT name, age FROM test.person",
"VALUES ('John', 20)");
}

@Test
public void testNestedFields()
{
String indexName = "data";
embeddedElasticsearchNode.getClient()
.prepareIndex(indexName, "doc")
.setSource(ImmutableMap.<String, Object>builder()
.put("name", "nestfield")
.put("fields.fielda", 32)
.put("fields.fieldb", "valueb")
.build())
.get();

embeddedElasticsearchNode.getClient()
.admin()
.indices()
.refresh(refreshRequest(indexName))
.actionGet();

assertQuery(
"SELECT name, fields.fielda, fields.fieldb FROM nested.data",
"VALUES ('nestfield', 32, 'valueb')");
}
}
@@ -0,0 +1,25 @@
{
"tableName": "data",
"schemaName": "nested",
"host": "localhost",
"port": "9300",
"clusterName": "test",
"index": "data",
"type": "doc",
"columns": [
{
"name": "name",
"type": "varchar",
"jsonPath": "name",
"jsonType": "varchar",
"ordinalPosition": "0"
},
{
"name": "fields",
"type": "row(fielda integer, fieldb varchar)",
"jsonPath": "fields",
"jsonType": "varchar",
"ordinalPosition": "1"
}
]
}