Skip to content

Commit

Permalink
Remove jsonFieldName instead of bqFieldName (#1297)
Browse files Browse the repository at this point in the history
when fields are null or sent to additional_properties
  • Loading branch information
relud committed May 26, 2020
1 parent 3a2adc5 commit 926e5f9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -265,7 +265,7 @@ private void transformForBqSchema(ObjectNode parent, List<Field> bqFields,
final Map<String, Field> bqFieldMap = bqFields.stream()
.collect(Collectors.toMap(Field::getName, Function.identity()));

for (String jsonFieldName : Sets.newHashSet(parent.fieldNames())) {
for (String jsonFieldName : Lists.newArrayList(parent.fieldNames())) {
final JsonNode value = parent.get(jsonFieldName);

final String bqFieldName;
Expand Down Expand Up @@ -316,7 +316,7 @@ private static boolean isNestedListType(Field field, JsonNode value) {

private void processField(String jsonFieldName, Field field, JsonNode value, ObjectNode parent,
ObjectNode additionalProperties) {
final String name = field.getName();
final String bqFieldName = field.getName();

// A record of key and value indicates we need to transformForBqSchema a map to an array.
if (isMapType(field)) {
Expand All @@ -332,8 +332,8 @@ private void processField(String jsonFieldName, Field field, JsonNode value, Obj

// An array signifies a fixed length tuple which should be given anonymous field names.
if (value.isArray()) {
updateParent(parent, name, processTupleField(jsonFieldName, field.getSubFields(),
(ArrayNode) value, additionalProperties));
updateParent(parent, jsonFieldName, bqFieldName, processTupleField(jsonFieldName,
field.getSubFields(), (ArrayNode) value, additionalProperties));
} else {
// Only transform value if it is not null
if (value.isObject()) {
Expand All @@ -344,7 +344,7 @@ private void processField(String jsonFieldName, Field field, JsonNode value, Obj
additionalProperties.set(jsonFieldName, props);
}
}
updateParent(parent, name, value);
updateParent(parent, jsonFieldName, bqFieldName, value);
}

// Likewise, we need to recursively call transformForBqSchema on repeated record types.
Expand Down Expand Up @@ -388,20 +388,17 @@ private void processField(String jsonFieldName, Field field, JsonNode value, Obj
if (!Streams.stream(repeatedAdditionalProperties).allMatch(EMPTY_OBJECT::equals)) {
additionalProperties.set(jsonFieldName, repeatedAdditionalProperties);
}
updateParent(parent, name, value);
updateParent(parent, jsonFieldName, bqFieldName, value);

// If we've made it here, we have a basic type or a list of basic types.
} else {
final Optional<JsonNode> coerced = coerceToBqType(value, field);
if (coerced.isPresent()) {
updateParent(parent, name, coerced.get());
} else {
// An empty coerced value means the actual type didn't match expected and we don't define
// a coercion. We put the value to additional_properties instead.
if (additionalProperties != null) {
additionalProperties.set(jsonFieldName, value);
}
parent.remove(name);
// use coerced.orElse(null) to remove the field via updateParent if necessary
updateParent(parent, jsonFieldName, bqFieldName, coerced.orElse(null));
// If coerced is not present that means the actual type didn't match expected and we don't
// define a coercion. We put the value to additional_properties instead.
if (!coerced.isPresent() && additionalProperties != null) {
additionalProperties.set(jsonFieldName, value);
}
}
}
Expand Down Expand Up @@ -456,10 +453,12 @@ private void expandMapType(String jsonFieldName, ObjectNode value, Field field,

final ArrayNode unmapped = Json.createArrayNode();
value.fields().forEachRemaining(e -> {
ObjectNode kv = Json.createObjectNode().put(FieldName.KEY, e.getKey());
ObjectNode kv = Json.createObjectNode();
valueFieldOption
.ifPresent(valueField -> processField(e.getKey(), valueField, e.getValue(), kv, props));
unmapped.add(kv);
// add key after processField so it can't be dropped due to e.getKey() matching
// FieldName.KEY when e.getValue() is null or empty or can't be coerced
unmapped.add(kv.put(FieldName.KEY, e.getKey()));
});
if (!Json.isNullOrEmpty(props)) {
additionalProperties.set(jsonFieldName, props);
Expand Down Expand Up @@ -567,11 +566,12 @@ private Optional<JsonNode> coerceSingleValueToBqType(JsonNode o, Field field) {
}
}

private static void updateParent(ObjectNode parent, String name, JsonNode value) {
private static void updateParent(ObjectNode parent, String jsonFieldName, String bqFieldName,
JsonNode value) {
if (Json.isNullOrEmpty(value)) {
parent.remove(name);
parent.remove(jsonFieldName);
} else {
parent.set(name, value);
parent.set(bqFieldName, value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
{"additional_properties":{},"test_nested_list":[{"list":["a","b","c"]}],"test_nested_csv":[{"list":"a,b,c"}],"test_list":["a","b","c"]}
{"additional_properties":{},"test_nested_list":[{}]}
{"additional_properties":{}}
{"additional_properties":{"test_int64_":"invalid"},"test_int64":8}
{"additional_properties":{},"test_map":[{"key":"key"},{"key":"value"},{"key":"other","value":"other"}]}
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_nested_list":[["a","b","c"]],"test_nested_csv":[{"list":"a,b,c"}],"test_list":["a","b","c"]}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_list":null,"test_nested_list":[null],"test_record":null}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_record":{"key":null}}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_int64 ":null,"test_int64":7,"test_int64_":"invalid","test_int64+":8}}
{"attributeMap":{"document_namespace":"live-sink","document_type":"test","document_version":"1"},"payload":{"test_map":{"key":null,"value":null,"other":"other"}}}

0 comments on commit 926e5f9

Please sign in to comment.