Permalink
Browse files

Data translator changes for supporting Pegasus union with aliases.

RB=1020864
BUG=SI-3473
G=si-dev
R=kbalasub,nshankar,mnchen
A=kbalasub,mnchen
  • Loading branch information...
saponniah committed Jun 15, 2017
1 parent c0a51b1 commit f43087cbbc24ed554c1aeb4c31219500033d7e11
@@ -321,27 +321,36 @@ private Object translate(Object value, DataSchema dataSchema, Schema avroSchema)
break;
case UNION:
UnionDataSchema unionDataSchema = (UnionDataSchema) dereferencedDataSchema;
Map.Entry<DataSchema, Schema> memberSchemas = findUnionMemberSchema(value, unionDataSchema, avroSchema);
if (memberSchemas == null)
if (unionDataSchema.areMembersAliased())
{
result = BAD_RESULT;
break;
}
if (value == null)
{
// schema must be "null" schema
result = Data.NULL;
// Since Pegasus 'union with aliases' are represented as an Avro record, the translation
// is handled separately.
result = translateAvroRecordToPegasusUnionWithAliases(value, unionDataSchema, avroSchema);
}
else
{
DataSchema memberDataSchema = memberSchemas.getKey();
Schema memberAvroSchema = memberSchemas.getValue();
String key = memberDataSchema.getUnionMemberKey();
dataMap = new DataMap(1);
_path.addLast(key);
dataMap.put(key, translate(value, memberDataSchema, memberAvroSchema));
_path.removeLast();
result = dataMap;
Map.Entry<DataSchema, Schema> memberSchemas = findUnionMemberSchema(value, unionDataSchema, avroSchema);
if (memberSchemas == null)
{
result = BAD_RESULT;
break;
}
if (value == null)
{
// schema must be "null" schema
result = Data.NULL;
}
else
{
DataSchema memberDataSchema = memberSchemas.getKey();
Schema memberAvroSchema = memberSchemas.getValue();
String key = memberDataSchema.getUnionMemberKey();
dataMap = new DataMap(1);
_path.addLast(key);
dataMap.put(key, translate(value, memberDataSchema, memberAvroSchema));
_path.removeLast();
result = dataMap;
}
}
break;
default:
@@ -367,17 +376,17 @@ private Object translate(Object value, DataSchema dataSchema, Schema avroSchema)
default:
key = memberAvroSchema.getType().toString().toLowerCase();
}
DataSchema memberDataSchema = unionDataSchema.getType(key);
DataSchema memberDataSchema = unionDataSchema.getTypeByMemberKey(key);
if (memberDataSchema == null)
{
for (DataSchema dataSchema : unionDataSchema.getTypes())
for (UnionDataSchema.Member member : unionDataSchema.getMembers())
{
AvroOverride avroOverride = getAvroOverride(dataSchema);
AvroOverride avroOverride = getAvroOverride(member.getType());
if (avroOverride != null)
{
if (avroOverride.getAvroSchemaFullName().equals(key))
{
memberDataSchema = dataSchema;
memberDataSchema = member.getType();
break;
}
}
@@ -390,6 +399,37 @@ private Object translate(Object value, DataSchema dataSchema, Schema avroSchema)
}
return new AbstractMap.SimpleEntry<DataSchema, Schema>(memberDataSchema, memberAvroSchema);
}
private Object translateAvroRecordToPegasusUnionWithAliases(Object value, UnionDataSchema unionDataSchema, Schema avroSchema)
{
Schema recordAvroSchema = extractNonnullSchema(avroSchema);
GenericRecord record = (GenericRecord) value;
Object fieldDiscriminatorValue = record.get(DataSchemaConstants.DISCRIMINATOR_FIELD);
if (fieldDiscriminatorValue == null)
{
appendMessage("cannot find required field %1$s in record %2$s", DataSchemaConstants.DISCRIMINATOR_FIELD, record);
return BAD_RESULT;
}
String fieldDiscriminator = fieldDiscriminatorValue.toString();
if (DataSchemaConstants.NULL_TYPE.equals(fieldDiscriminator))
{
return Data.NULL;
}
else
{
Object fieldValue = record.get(fieldDiscriminator);
Schema fieldAvroSchema = recordAvroSchema.getField(fieldDiscriminator).schema();
DataSchema memberDataSchema = unionDataSchema.getTypeByMemberKey(fieldDiscriminator);
DataMap result = new DataMap(1);
_path.add(fieldDiscriminator);
result.put(fieldDiscriminatorValue.toString(), translate(fieldValue, memberDataSchema, fieldAvroSchema));
_path.removeLast();
return result;
}
}
}
private static class DataMapToGenericRecordTranslator extends DataTranslator
@@ -547,15 +587,14 @@ else if (fieldValue == null)
Object defaultValue = field.getDefault();
if (defaultValue != null)
{
Object fieldAvroValue = translate(defaultValue, fieldDataSchema, fieldAvroSchema);
avroRecord.put(fieldName, fieldAvroValue);
fieldValue = defaultValue;
}
else
{
appendMessage("required field is absent");
_path.removeLast();
continue;
}
_path.removeLast();
continue;
}
Object fieldAvroValue = translate(fieldValue, fieldDataSchema, fieldAvroSchema);
avroRecord.put(fieldName, fieldAvroValue);
@@ -565,6 +604,7 @@ else if (fieldValue == null)
break;
case UNION:
UnionDataSchema unionDataSchema = (UnionDataSchema) dereferencedDataSchema;
String key;
Object memberValue;
if (value == Data.NULL)
@@ -579,18 +619,27 @@ else if (fieldValue == null)
key = entry.getKey();
memberValue = entry.getValue();
}
DataSchema memberDataSchema = unionDataSchema.getType(key);
Map.Entry<String, Schema> memberAvroEntry = findUnionMember(memberDataSchema, avroSchema);
if (memberAvroEntry == null)
if (unionDataSchema.areMembersAliased())
{
result = BAD_RESULT;
break;
// Since Pegasus 'union with aliases' are represented as an Avro record, the translation
// is handled separately.
result = translatePegasusUnionWithAliasesToAvroRecord(key, memberValue, unionDataSchema, avroSchema);
}
else
{
DataSchema memberDataSchema = unionDataSchema.getTypeByMemberKey(key);
Map.Entry<String, Schema> memberAvroEntry = findUnionMember(memberDataSchema, avroSchema);
if (memberAvroEntry == null) {
result = BAD_RESULT;
break;
}
Schema memberAvroSchema = memberAvroEntry.getValue();
_path.addLast(memberAvroEntry.getKey());
Object memberAvroValue = translate(memberValue, memberDataSchema, memberAvroSchema);
_path.removeLast();
result = memberAvroValue;
}
Schema memberAvroSchema = memberAvroEntry.getValue();
_path.addLast(memberAvroEntry.getKey());
Object memberAvroValue = translate(memberValue, memberDataSchema, memberAvroSchema);
_path.removeLast();
result = memberAvroValue;
break;
default:
appendMessage("schema type unknown %1$s", dereferencedDataSchema.getType());
@@ -599,6 +648,88 @@ else if (fieldValue == null)
}
return result;
}
private Object translatePegasusUnionWithAliasesToAvroRecord(String memberKey, Object memberValue, UnionDataSchema unionDataSchema, Schema avroSchema)
{
Schema recordAvroSchema = extractNonnullSchema(avroSchema);
GenericData.Record avroRecord = new GenericData.Record(recordAvroSchema);
// Bail out if the pegasus union data has an invalid member key
DataSchema memberDataSchema = unionDataSchema.getTypeByMemberKey(memberKey);
if (memberDataSchema == null)
{
appendMessage("cannot find member key %1$s in union %2$s", memberKey, unionDataSchema);
return BAD_RESULT;
}
// If the member value is null, don't try to map this to a field as the Avro record will not have
// a field for a null union member
if (memberValue != Data.NULL)
{
Schema.Field avroField = recordAvroSchema.getField(memberKey);
if (avroField == null)
{
appendMessage("cannot find field %1$s in record %2$s", memberKey, recordAvroSchema);
return BAD_RESULT;
}
_path.add(memberKey);
Object translatedValue = translate(memberValue, memberDataSchema, avroField.schema());
avroRecord.put(memberKey, translatedValue);
_path.removeLast();
}
Schema.Field avroDiscriminatorField = recordAvroSchema.getField(DataSchemaConstants.DISCRIMINATOR_FIELD);
if (avroDiscriminatorField == null)
{
appendMessage("cannot find field %1$s in record %2$s", DataSchemaConstants.DISCRIMINATOR_FIELD, recordAvroSchema);
return BAD_RESULT;
}
_path.add(DataSchemaConstants.DISCRIMINATOR_FIELD);
Object fieldDiscriminator = _avroAdapter.createEnumSymbol(avroDiscriminatorField.schema(), memberKey);
avroRecord.put(DataSchemaConstants.DISCRIMINATOR_FIELD, fieldDiscriminator);
_path.removeLast();
return avroRecord;
}
}
/**
* Avro's optional fields are defined as an Union. This method can be used to extract the non-null type
* embedded in the union. If the passed in avro schema is not a Union type, it is returned as is.
*
* @param avroSchema
* @return
*/
protected Schema extractNonnullSchema(Schema avroSchema)
{
// If the unionDataSchema is from an optional field, the avroSchema will be an union with two members -
// the translated record and a null.
if (avroSchema.getType() != Schema.Type.UNION)
{
return avroSchema;
}
else
{
List<Schema> memberTypes = avroSchema.getTypes();
if (memberTypes.size() != 2)
{
appendMessage("found more than two types in an union with null for an optional field %1$s", avroSchema);
return avroSchema;
}
for (Schema memberType : memberTypes)
{
if (memberType.getType() != Schema.Type.NULL)
{
return memberType;
}
}
appendMessage("cannot find a non-null type in an union with null for an optional field %1$s", avroSchema);
return null;
}
}
protected Map.Entry<String, Schema> findUnionMember(DataSchema dataSchema, Schema avroSchema)
Oops, something went wrong.

0 comments on commit f43087c

Please sign in to comment.