Skip to content

Commit

Permalink
feat(plugin): add capability to merge schemas deriving from records
Browse files Browse the repository at this point in the history
This commit adds a new config property merge.value.connect.schemas
  • Loading branch information
fhussonnois committed Sep 8, 2021
1 parent 99c374f commit 0e29ce2
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 234 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Expand Up @@ -61,4 +61,5 @@
<suppress checks="NPathComplexity" files="KafkaBasedLog.java"/>
<!-- Classes used for configuration -->
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files=".*Config.java"/>
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files="SchemaUtils.java"/>
</suppressions>
Expand Up @@ -21,117 +21,178 @@
import io.streamthoughts.kafka.connect.filepulse.data.DataException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.Collection;
import java.util.List;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Utility methods to manipulate Connect {@link Schema}.
*/
public class SchemaUtils {

public static List<String> getAllFieldNames(final Schema schema) {
return schema.fields().stream().map(Field::name).collect(Collectors.toList());
/**
* @return the merged of two given {@link Schema}.
*/
public static Schema merge(final Schema left, final Schema right) {

if (left.equals(right)) return left;

if (left.type() == Type.ARRAY ||
right.type() == Type.ARRAY) {
return mergeArray(left, right);
}

// Struct cannot only be merged with another Struct
if (left.type() == Type.STRUCT &&
right.type() == Type.STRUCT
) {
return mergeStruct(left, right);
}

// Map cannot only be merged with another Map
if (left.type() == Type.MAP &&
right.type() == Type.MAP) {
return mergeMap(left, right);
}

if (left.type() == right.type()) {
return left;
}

if (left.type() == Type.STRING || right.type() == Type.STRING)
return mergeMetadata(left, right, SchemaBuilder.string());

if ( (left.type() == Type.INT64 && right.type() == Type.INT32) ||
(right.type() == Type.INT64 && left.type() == Type.INT32)) {
return mergeMetadata(left, right, SchemaBuilder.int64());
}

if ( (left.type() == Type.FLOAT64 && isNumber(right.type())) ||
(right.type() == Type.FLOAT64 && isNumber(left.type()))) {
return mergeMetadata(left, right, SchemaBuilder.float64());
}

throw new DataException("Cannot merge incompatible schema type " + left.type() + "<>" + right.type());
}

public static SchemaBuilder copySchemaBasics(final Schema source) {
return copySchemaBasics(source, new SchemaBuilder(source.type()));
private static Schema mergeMap(final Schema left, final Schema right) {
final SchemaBuilder merged = SchemaBuilder.map(
merge(left.keySchema(), right.keySchema()),
merge(left.valueSchema(), right.valueSchema())
);

return mergeMetadata(left, right, merged).build();
}

public static SchemaBuilder copySchemaBasics(final Schema source, final SchemaBuilder builder) {
builder.name(source.name());
builder.version(source.version());
builder.doc(source.doc());
private static Schema mergeArray(final Schema left, final Schema right) {
final Schema valueSchema;

final Map<String, String> params = source.parameters();
if (params != null) {
builder.parameters(params);
}
// Merge Array<?> with Array<?>
if (left.type() == Type.ARRAY &&
right.type() == Type.ARRAY) {
valueSchema = merge(left.valueSchema(), right.valueSchema());

return builder;
// Merge Array<?> with ?
} else if (left.type() == Type.ARRAY) {
valueSchema = merge(left.valueSchema(), right);

// Merge ? with Array<?>
} else {
valueSchema = merge(left, right.valueSchema());
}
return SchemaBuilder
.array(valueSchema)
.optional()
.defaultValue(null)
.build();
}

public static void merge(final Schema leftSchema,
final Schema rightSchema,
final SchemaBuilder builder,
final Set<String> overwrite) {
final Map<String, Field> rightFields = groupFieldByName(rightSchema.fields());
for (Field leftField : leftSchema.fields()) {
final String leftFieldName = leftField.name();
private static Schema mergeStruct(final Schema left, final Schema right) {

if (!Objects.equals(left.name(), right.name()))
throw new DataException(
"Cannot merge two schemas wih different name " + left.name() + "<>" + right.name());

final SchemaBuilder merged = mergeMetadata(left, right, new SchemaBuilder(Type.STRUCT));

boolean isOverwrite = overwrite.contains(leftFieldName);
final Map<String, Schema> remaining = left.fields()
.stream()
.collect(Collectors.toMap(Field::name, Field::schema));

final Field rightField = rightFields.get(leftFieldName);
// Iterator on RIGHT fields and compare to LEFT fields.
for (final Field rightField : right.fields()) {

if (rightField == null) {
builder.field(leftFieldName, leftField.schema());
final String name = rightField.name();

// field exist only on RIGHT schema.
if (!remaining.containsKey(name)) {
merged.field(name, rightField.schema());
continue;
}

if (isOverwrite) {
continue; // skip the left field
// field exist on both LEFT and RIGHT schemas.
final Schema leftSchema = remaining.remove(name);

try {
final Schema fieldMergedSchema = merge(leftSchema, rightField.schema());
merged.field(name, fieldMergedSchema);
} catch (Exception e) {
throw new DataException("Failed to merge schemas for field '" + name + "'. ", e);
}
}

checkIfFieldsCanBeMerged(leftField, rightField);
// remaining fields that existing only on LEFT schema.
remaining.forEach(merged::field);

if (isTypeOf(leftField, Schema.Type.ARRAY)) {
builder.field(leftFieldName, leftField.schema());
} else {
final SchemaBuilder optionalArray = SchemaBuilder
.array(leftField.schema())
.optional();
builder.field(leftFieldName, optionalArray);
}
rightFields.remove(leftFieldName);
return merged;
}

private static SchemaBuilder mergeMetadata(final Schema left,
final Schema right,
final SchemaBuilder merged) {

merged.name(left.name());
merged.doc(left.doc());

if (left.isOptional() || right.isOptional()) {
merged.optional();
}
// Copy all remaining fields from right struct.
for (Field f : rightFields.values()) {
Schema schema = f.schema();
builder.field(f.name(), schema);

if (left.defaultValue() != null) {
merged.defaultValue(left.defaultValue());
} else if (right.defaultValue() != null) {
merged.defaultValue(right.defaultValue());
}
}

private static void checkIfFieldsCanBeMerged(final Field left, final Field right) {
final String name = left.name();
if (isTypeOf(left, Schema.Type.ARRAY)) {
Schema valueSchemaLeft = left.schema().valueSchema();
if (isTypeOf(right, Schema.Type.ARRAY)) {
Schema valueSchemaRight = right.schema().valueSchema();
throwIfTypesAreNotEqual(name, valueSchemaLeft, valueSchemaRight,
"Cannot merge fields '%s' of type array with different value types : Array[%s]<>Array[%s]");
} else {
throwIfTypesAreNotEqual(name, valueSchemaLeft, right.schema(),
"Cannot merge fields '%s' with different array value types : Array[%s]<>%s");
}
} else if (isTypeOf(right, Schema.Type.ARRAY)) {
Schema valueSchemaRight = right.schema().valueSchema();
throwIfTypesAreNotEqual(name, left.schema(), valueSchemaRight,
"Cannot merge fields '%s' with different array value types : %s<>Array[%s]");
// neither left or right is of type array
} else {
throwIfTypesAreNotEqual(name, left.schema(), right.schema(),
"Cannot merge fields '%s' with different types : %s<>%s");
final Map<String, String> parameters = new HashMap<>();
if (left.parameters() != null) {
parameters.putAll(left.parameters());
}

if (right.parameters() != null) {
parameters.putAll(right.parameters());
}
}

private static void throwIfTypesAreNotEqual(final String field,
final Schema s1,
final Schema s2,
final String message) {
if (!s1.type().equals(s2.type())) {
throw new DataException(String.format(message, field, s1.type(), s2.type()));
if (!parameters.isEmpty()) {
merged.parameters(parameters);
}

return merged;
}

public static boolean isTypeOf(final Field field, final Schema.Type type) {
return field.schema().type().equals(type);
private static boolean isInteger(final Type type) {
return type == Type.INT8 ||
type == Type.INT16 ||
type == Type.INT32 ||
type == Type.INT64;
}

public static Map<String, Field> groupFieldByName(final Collection<Field> fields) {
return fields.stream().collect(Collectors.toMap(Field::name, f -> f));
private static boolean isNumber(final Type type) {
return isInteger(type) || Arrays.asList(Type.FLOAT32, Type.FLOAT64).contains(type);
}
}
Expand Up @@ -62,5 +62,6 @@ SourceRecord toSourceRecord(
final FileObjectMeta metadata,
final String defaultTopic,
final Integer defaultPartition,
final Schema connectSchema);
final Schema connectSchema,
final boolean connectSchemaMergeEnabled);
}
Expand Up @@ -20,6 +20,7 @@

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.internal.SchemaUtils;
import io.streamthoughts.kafka.connect.filepulse.source.internal.ConnectSchemaMapper;
import io.streamthoughts.kafka.connect.filepulse.source.internal.InternalSourceRecordBuilder;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -44,7 +45,6 @@ public TypedFileRecord(final FileRecordOffset offset,
final TypedStruct struct) {
super(offset, struct);
internalSourceRecordBuilder = new InternalSourceRecordBuilder();

}

/**
Expand All @@ -56,19 +56,33 @@ public SourceRecord toSourceRecord(final Map<String, ?> sourcePartition,
final FileObjectMeta metadata,
final String defaultTopic,
final Integer defaultPartition,
final Schema connectSchema) {
final Schema connectSchema,
final boolean connectSchemaMergeEnabled) {

final TypedStruct value = value();

if (connectSchema != null) {
final Schema valueSchema;
if (connectSchemaMergeEnabled && value != null) {
Schema recordValueSchema = value.schema().map(ConnectSchemaMapper.INSTANCE);
if (connectSchema != null) {
valueSchema = SchemaUtils.merge(connectSchema, recordValueSchema);
} else {
valueSchema = recordValueSchema;
}
} else {
valueSchema = connectSchema;
}

if (valueSchema != null) {
internalSourceRecordBuilder.withValue(() ->
value == null ? null : ConnectSchemaMapper.INSTANCE.map(connectSchema, value)
value == null ? null : ConnectSchemaMapper.INSTANCE.map(valueSchema, value)
);
} else {
internalSourceRecordBuilder.withValue(() ->
value == null ? null : ConnectSchemaMapper.INSTANCE.map(value.schema(), value)
);
}

return internalSourceRecordBuilder.build(
sourcePartition,
sourceOffset,
Expand Down

0 comments on commit 0e29ce2

Please sign in to comment.