Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-119851 SMT native converter parse Struct into json #131

Merged
merged 1 commit into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,15 @@ private void insert(final SinkRecord record)
SinkRecord snowflakeRecord;
if (!(record.value() instanceof SnowflakeRecordContent))
{
// throw SnowflakeErrors.ERROR_0019.getException();
SnowflakeRecordContent newSFContent = new SnowflakeRecordContent(record.value());
SnowflakeRecordContent newSFContent;
try
{
newSFContent = new SnowflakeRecordContent(record.valueSchema(), record.value());
} catch (Exception e)
{
newSFContent = new SnowflakeRecordContent();
logError("native content parser error:\n{}", e.getMessage());
}
// create new sinkRecord
snowflakeRecord = new SinkRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), new SnowflakeJsonSchema(),
newSFContent, record.kafkaOffset(), record.timestamp(), record.timestampType(), record.headers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,25 @@
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.PropertyAccessor;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonInclude;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ArrayNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Map;
import java.util.TimeZone;

public class SnowflakeRecordContent
{
Expand All @@ -17,11 +33,17 @@ public class SnowflakeRecordContent
private int schemaID;
private boolean isBroken;

public static final SimpleDateFormat ISO_DATE_FORMAT= new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
public static final SimpleDateFormat TIME_FORMAT= new SimpleDateFormat("HH:mm:ss.SSSZ");
static{
ISO_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));;
}


/**
* constructor for null value
*/
SnowflakeRecordContent()
public SnowflakeRecordContent()
{
content = new JsonNode[1];
content[0] = MAPPER.createObjectNode();
Expand All @@ -30,19 +52,161 @@ public class SnowflakeRecordContent

/**
* constructor for native json converter
* @param data json map
* @param schema schema of the object
* @param data object produced by native avro/json converters
*/
public SnowflakeRecordContent(Object data)
public SnowflakeRecordContent(Schema schema, Object data)
{
this.content = new JsonNode[1];
MAPPER.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
this.content[0] = MAPPER.valueToTree(data);;
this.isBroken = false;
this.schemaID = NON_AVRO_SCHEMA;
try
{
this.content[0] = convertToJson(schema, data);
} catch (DataException e)
{
this.isBroken = true;
this.brokenData = data.toString().getBytes();
return;
}
this.isBroken = false;
this.brokenData = null;
}

/**
* Convert this object, in the org.apache.kafka.connect.data format, into a JSON object, returning both the schema
* and the converted object.
* @param schema schema of the object
* @param logicalValue object to be converted
* @return a JsonNode of the object
*/
private static JsonNode convertToJson(Schema schema, Object logicalValue) {
if (logicalValue == null) {
if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
return null;
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
return JsonNodeFactory.instance.nullNode();
throw new DataException("Conversion error: null value for field that is required and has no default value");
}

Object value = logicalValue;
try {
final Schema.Type schemaType;
if (schema == null) {
schemaType = ConnectSchema.schemaType(value.getClass());
if (schemaType == null)
throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type.");
} else {
schemaType = schema.type();
}
switch (schemaType) {
case INT8:
return JsonNodeFactory.instance.numberNode((Byte) value);
case INT16:
return JsonNodeFactory.instance.numberNode((Short) value);
case INT32:
if (schema != null && Date.LOGICAL_NAME.equals(schema.name())) {
return JsonNodeFactory.instance.textNode(ISO_DATE_FORMAT.format((java.util.Date) value));
}
if (schema != null && Time.LOGICAL_NAME.equals(schema.name())) {
return JsonNodeFactory.instance.textNode(TIME_FORMAT.format((java.util.Date) value));
}
return JsonNodeFactory.instance.numberNode((Integer) value);
case INT64:
String schemaName = schema.name();
if(Timestamp.LOGICAL_NAME.equals(schemaName)){
return JsonNodeFactory.instance.numberNode(Timestamp.fromLogical(schema, (java.util.Date) value));
}
return JsonNodeFactory.instance.numberNode((Long) value);
case FLOAT32:
return JsonNodeFactory.instance.numberNode((Float) value);
case FLOAT64:
return JsonNodeFactory.instance.numberNode((Double) value);
case BOOLEAN:
return JsonNodeFactory.instance.booleanNode((Boolean) value);
case STRING:
CharSequence charSeq = (CharSequence) value;
return JsonNodeFactory.instance.textNode(charSeq.toString());
case BYTES:
if (Decimal.LOGICAL_NAME.equals(schema.name())) {
return JsonNodeFactory.instance.numberNode((BigDecimal) value);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very late to the party here, but my team at work is having trouble with avro logical decimal types being stored in snowflake properly (we're seeing what's reported in #122 ) and this looks odd to me.

Two things look odd:

  1. this does a cast from a byte array but does not specify the precision or scale of the logical data type to the resulting BigDecimal
  2. json doesn't support arbitrary precision in its numeric type (it's basically a double). The only safe way to represent a java BigDecimal in json is to use a string

}

byte[] valueArr = null;
if (value instanceof byte[])
valueArr = (byte[]) value;
else if (value instanceof ByteBuffer)
valueArr = ((ByteBuffer) value).array();

if (valueArr == null)
throw new DataException("Invalid type for bytes type: " + value.getClass());

return JsonNodeFactory.instance.binaryNode(valueArr);

case ARRAY: {
Collection collection = (Collection) value;
ArrayNode list = JsonNodeFactory.instance.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode fieldValue = convertToJson(valueSchema, elem);
list.add(fieldValue);
}
return list;
}
case MAP: {
Map<?, ?> map = (Map<?, ?>) value;
// If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
boolean objectMode;
if (schema == null) {
objectMode = true;
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (!(entry.getKey() instanceof String)) {
objectMode = false;
break;
}
}
} else {
objectMode = schema.keySchema().type() == Schema.Type.STRING;
}
ObjectNode obj = null;
ArrayNode list = null;
if (objectMode)
obj = JsonNodeFactory.instance.objectNode();
else
list = JsonNodeFactory.instance.arrayNode();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode mapKey = convertToJson(keySchema, entry.getKey());
JsonNode mapValue = convertToJson(valueSchema, entry.getValue());

if (objectMode)
obj.set(mapKey.asText(), mapValue);
else
list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
}
return objectMode ? obj : list;
}
case STRUCT: {
Struct struct = (Struct) value;
if (struct.schema() != schema)
throw new DataException("Mismatching schema.");
ObjectNode obj = JsonNodeFactory.instance.objectNode();
for (Field field : schema.fields()) {
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
}
return obj;
}
}

throw new DataException("Couldn't convert " + value + " to JSON.");
} catch (ClassCastException e) {
throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass());
}
}


/**
* constructor for json converter
* @param data json node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.TestUtils;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;


import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class RecordContentTest
{
Expand Down Expand Up @@ -63,5 +72,44 @@ public void test() throws IOException
assert content.getData().length == 1;
assert content.getData()[0].size() == 0;
assert content.getData()[0].toString().equals("{}");

// AVRO struct object
SchemaBuilder builder = SchemaBuilder.struct()
.field("int8", SchemaBuilder.int8().defaultValue((byte) 2).doc("int8 field").build())
.field("int16", Schema.INT16_SCHEMA)
.field("int32", Schema.INT32_SCHEMA)
.field("int64", Schema.INT64_SCHEMA)
.field("float32", Schema.FLOAT32_SCHEMA)
.field("float64", Schema.FLOAT64_SCHEMA)
.field("boolean", Schema.BOOLEAN_SCHEMA)
.field("string", Schema.STRING_SCHEMA)
.field("bytes", Schema.BYTES_SCHEMA)
.field("array", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build())
.field("mapNonStringKeys", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA)
.build());
Schema schema = builder.build();
Struct original = new Struct(schema)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", 12L)
.put("float32", 12.2f)
.put("float64", 12.2)
.put("boolean", true)
.put("string", "foo")
.put("bytes", ByteBuffer.wrap("foo".getBytes()))
.put("array", Arrays.asList("a", "b", "c"))
.put("map", Collections.singletonMap("field", 1))
.put("mapNonStringKeys", Collections.singletonMap(1, 1));

content = new SnowflakeRecordContent(schema, original);
assert content.getData()[0].toString().equals("{\"int8\":12,\"int16\":12,\"int32\":12,\"int64\":12,\"float32\":12.2,\"float64\":12.2,\"boolean\":true,\"string\":\"foo\",\"bytes\":\"Zm9v\",\"array\":[\"a\",\"b\",\"c\"],\"map\":{\"field\":1},\"mapNonStringKeys\":[[1,1]]}");

// JSON map object
JsonNode jsonObject = mapper.readTree("{\"int8\":12,\"int16\":12,\"int32\":12,\"int64\":12,\"float32\":12.2,\"float64\":12.2,\"boolean\":true,\"string\":\"foo\",\"bytes\":\"Zm9v\",\"array\":[\"a\",\"b\",\"c\"],\"map\":{\"field\":1},\"mapNonStringKeys\":[[1,1]]}");
Map<String, Object> jsonMap = mapper.convertValue(jsonObject, new TypeReference<Map<String, Object>>(){});
content = new SnowflakeRecordContent(null, jsonMap);
assert content.getData()[0].toString().equals("{\"int8\":12,\"int16\":12,\"int32\":12,\"int64\":12,\"float32\":12.2,\"float64\":12.2,\"boolean\":true,\"string\":\"foo\",\"bytes\":\"Zm9v\",\"array\":[\"a\",\"b\",\"c\"],\"map\":{\"field\":1},\"mapNonStringKeys\":[[1,1]]}");
}
}