Permalink
Browse files

Fix the SchemaTranslator to translate array or map of union with alia…

…ses fields in the Pegasus schema.

RB=1128480
BUG=SI-4653,SI-4659
G=si-dev
R=kbalasub,mnchen
A=kbalasub
  • Loading branch information...
saponniah committed Oct 14, 2017
1 parent d26a4dd commit 1c9c4f5ced926b309b8b93d0b6d5ec2d7b9742a4
View
@@ -1,5 +1,7 @@
15.1.10
-------
(RB=1128480)
Fix the SchemaTranslator to translate array or map of union with aliases fields in the Pegasus schema.
15.1.9
-------
@@ -4,10 +4,12 @@
import com.linkedin.data.Data;
import com.linkedin.data.DataMap;
import com.linkedin.data.message.Message;
import com.linkedin.data.schema.ArrayDataSchema;
import com.linkedin.data.schema.DataSchema;
import com.linkedin.data.schema.DataSchemaConstants;
import com.linkedin.data.schema.DataSchemaTraverse;
import com.linkedin.data.schema.EnumDataSchema;
import com.linkedin.data.schema.MapDataSchema;
import com.linkedin.data.schema.Name;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.schema.UnionDataSchema;
@@ -59,47 +61,135 @@ public void callback(List<String> path, DataSchema schema)
for (RecordDataSchema.Field field : recordSchema.getFields())
{
DataSchema fieldSchema = field.getType().getDereferencedDataSchema();
if (fieldSchema.getType() == DataSchema.Type.UNION && ((UnionDataSchema) fieldSchema).areMembersAliased())
// The conversion from Pegasus union type to an Avro record is performed when the union appears as either the
// field's direct type or when the field's type is an array or a map whose (nested) elements is of union type.
// This conversion ignores the default value when specified on an array or map typed field. Since the elements in
// the default value collection can have conflicting union members, it will be hard to figure out the optionality
// property of the fields in the generated Avro record.
DataMap modifiedDefaultValue = modifyFieldDefaultValue(field, path);
DataSchema modifiedSchema = modifyFieldSchema(recordSchema, field, fieldSchema, modifiedDefaultValue);
if (modifiedSchema != null)
{
DataMap modifiedDefaultValue = null;
Object value = field.getDefault();
if (value != null)
{
String key;
if (value == Data.NULL)
{
key = DataSchemaConstants.NULL_TYPE;
}
else
{
DataMap unionMap = (DataMap) value;
if (unionMap.size() != 1)
{
Message message = new Message(path.toArray(), "union default value $1%s has more than one entry", value);
throw new IllegalArgumentException(message.toString());
}
Map.Entry<String, Object> entry = unionMap.entrySet().iterator().next();
key = entry.getKey();
}
modifiedDefaultValue = (value == Data.NULL) ? new DataMap() : new DataMap((DataMap) value);
modifiedDefaultValue.put(DataSchemaConstants.DISCRIMINATOR_FIELD, key);
}
overrideUnionFieldSchemaAndDefault(field, modifiedSchema, modifiedDefaultValue);
}
}
}
// Stash the field's original type and default value, so that we can use this for reverting them back after
// the schema translation is complete. This is because we don't want the input schema to have any modifications
// when the control goes back to the caller.
FieldOverride fieldSchemaOverride = new FieldOverride(field.getType(), field.getDefault());
_schemaOverrides.put(field, fieldSchemaOverride);
// If the field is required or the OptionalDefaultMode.TRANSLATE_DEFAULT is used, propagate the default value to the new record
boolean propagateDefault = !field.getOptional() || _options.getOptionalDefaultMode() == OptionalDefaultMode.TRANSLATE_DEFAULT;
DataSchema modifiedSchema = buildContainerRecordFromUnion(
(UnionDataSchema) fieldSchema, field.getName(), recordSchema.getBindingName(), propagateDefault ? modifiedDefaultValue : null);
field.setType(modifiedSchema);
field.setDefault(modifiedDefaultValue);
/**
* Translates the default value specified on a field. The default value translation only happens for fields whose type
* is a union that uses aliaes for its members. The modified default value will be for the equivalent Avro record that
* will be generated for this Union type during schema translation.
*
* @param field Reference to the union field whose default value is modified
* @param path The path of the union field whose default value is modified
* @return An instance of {@link DataMap} which is the modified default value or null if the default value doesn't
* have to be translated
*/
private DataMap modifyFieldDefaultValue(RecordDataSchema.Field field, List<String> path)
{
DataMap modifiedDefaultValue = null;
// Find if the field's type is a union that uses aliases for its members
DataSchema fieldSchema = field.getType().getDereferencedDataSchema();
boolean unionWithMembersAliased = fieldSchema.getType() == DataSchema.Type.UNION && ((UnionDataSchema) fieldSchema).areMembersAliased();
// If the field is required or the OptionalDefaultMode.TRANSLATE_DEFAULT is used, propagate the default value to the new record
boolean propagateDefault = !field.getOptional() || _options.getOptionalDefaultMode() == OptionalDefaultMode.TRANSLATE_DEFAULT;
Object defaultValue = field.getDefault();
if (unionWithMembersAliased && propagateDefault && defaultValue != null)
{
String key;
if (defaultValue == Data.NULL) {
key = DataSchemaConstants.NULL_TYPE;
} else {
DataMap unionMap = (DataMap) defaultValue;
if (unionMap.size() != 1) {
Message message = new Message(path.toArray(), "union default value $1%s has more than one entry", defaultValue);
throw new IllegalArgumentException(message.toString());
}
Map.Entry<String, Object> entry = unionMap.entrySet().iterator().next();
key = entry.getKey();
}
modifiedDefaultValue = (defaultValue == Data.NULL) ? new DataMap() : new DataMap((DataMap) defaultValue);
modifiedDefaultValue.put(DataSchemaConstants.DISCRIMINATOR_FIELD, key);
}
return modifiedDefaultValue;
}
/**
* Modify the schema for the specified field. The schema modification happens only for fields whose type is either a
* union that uses aliases for its members or an array or map that has a similar union as its element type (either
* direct or nested).
*
* @param recordSchema An instance of {@link RecordDataSchema} for the record that contains this field
* @param field An instance of {@link com.linkedin.data.schema.RecordDataSchema.Field} whose schema needs to be translated
* @param dataSchema An instance of {@link DataSchema} that is being translated. The initial call will have the field's
* schema but when called recursively for arrays and maps, the schema will be of it's elements.
* @param modifiedDefaultValue An instance of {@link DataMap} which is the modified default value for the field
* @return An instance of {@link DataSchema} which is the translated schema for the field or null, if the schema
* doesn't have to be translated
*/
private DataSchema modifyFieldSchema(RecordDataSchema recordSchema, RecordDataSchema.Field field,
DataSchema dataSchema, DataMap modifiedDefaultValue)
{
DataSchema modifiedSchema = null;
switch (dataSchema.getType()) {
case ARRAY:
DataSchema itemsSchema = ((ArrayDataSchema) dataSchema).getItems().getDereferencedDataSchema();
// Stop propagating the default value if the field type is an array
DataSchema modifiedItemsSchema = modifyFieldSchema(recordSchema, field, itemsSchema, null);
if (modifiedItemsSchema != null)
{
modifiedSchema = new ArrayDataSchema(modifiedItemsSchema);
}
break;
case MAP:
DataSchema valuesSchema = ((MapDataSchema) dataSchema).getValues().getDereferencedDataSchema();
// Stop propagating the default value if the field type is a map
DataSchema modifiedValuesSchema = modifyFieldSchema(recordSchema, field, valuesSchema, null);
if (modifiedValuesSchema != null)
{
modifiedSchema = new MapDataSchema(modifiedValuesSchema);
}
break;
case UNION:
UnionDataSchema unionDataSchema = (UnionDataSchema) dataSchema;
if (unionDataSchema.areMembersAliased())
{
modifiedSchema = buildContainerRecordFromUnion(
unionDataSchema, field.getName(), recordSchema.getFullName(), modifiedDefaultValue);
}
break;
}
return modifiedSchema;
}
/**
* Helper method to set an override for the field's schema and default value.
*
* @param field Reference to the field whose schema and default value is being overridden
* @param modifiedSchema The override schema to use for the specified field
* @param modifiedDefaultValue The override default value to use for the specified field
*/
private void overrideUnionFieldSchemaAndDefault(RecordDataSchema.Field field,
DataSchema modifiedSchema, Object modifiedDefaultValue)
{
// Stash the field's original type and default value, so that we can use this for reverting them back after
// the schema translation is complete. This is because we don't want the input schema to have any modifications
// when the control goes back to the caller.
FieldOverride fieldSchemaOverride = new FieldOverride(field.getType(), field.getDefault());
_schemaOverrides.put(field, fieldSchemaOverride);
field.setType(modifiedSchema);
field.setDefault(modifiedDefaultValue);
}
/**
Oops, something went wrong.

0 comments on commit 1c9c4f5

Please sign in to comment.