Skip to content

Commit

Permalink
Updating default parsing in AvscParser (#546)
Browse files Browse the repository at this point in the history
* Default for a node can be processed if its type is union and first element of union is resolved.

* Updated processing of defaults to be done after resolution of schemas in the same file is complete. Added UT

* fix checkstyle

* refactor logic

* unnecessary continue removed
  • Loading branch information
li-ukumar committed Feb 5, 2024
1 parent e2849e7 commit 2de30d1
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,34 @@ public void resolveReferences() {
for (AvroSchema schema : definedSchemas) {
resolveReferences(schema);
}

// Once all schemas are resolved for this file, we can process the defaults for fields
for (AvroSchema schema : definedSchemas) {
processAllDefaults(schema);
}
}

private void processAllDefaults(AvroSchema schema) {
AvroType type = schema.type();
if (type == AvroType.RECORD) {
AvroRecordSchema recordSchema = (AvroRecordSchema) schema;
List<AvroSchemaField> fields = recordSchema.getFields();
for (AvroSchemaField field : fields) {
// If field has no default value, we can skip it
if (!field.hasDefaultValue()) {
continue;
}
SchemaOrRef fieldSchema = field.getSchemaOrRef();
// If schema is not fully defined, add it to the list of fields with unparsed defaults
if (!fieldSchema.isFullyDefined()) {
fieldsWithUnparsedDefaults.add(field);

// schema is defined and default value is unparsed.
} else if (field.getDefaultValue() instanceof AvscUnparsedLiteral) {
AvscParseUtil.lateParseFieldDefault(field, this);
}
}
}
}

private void resolveReferences(AvroSchema schema) {
Expand All @@ -188,23 +216,9 @@ private void resolveReferences(AvroSchema schema) {
List<AvroSchemaField> fields = recordSchema.getFields();
for (AvroSchemaField field : fields) {
SchemaOrRef fieldSchema = field.getSchemaOrRef();
boolean hasDefault = field.hasDefaultValue();
boolean wasDefinedBefore = fieldSchema.isFullyDefined();
if (!wasDefinedBefore) {
resolveReferences(fieldSchema);
if (!fieldSchema.isFullyDefined()) {
if (hasDefault) {
fieldsWithUnparsedDefaults.add(field);
}
continue;
}
}
//fully defined if we're here
if (!hasDefault) {
continue;
}
if (field.getDefaultValue() instanceof AvscUnparsedLiteral) {
AvscParseUtil.lateParseFieldDefault(field, this);
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.avroutil1.parser.exceptions.JsonParseException;
import com.linkedin.avroutil1.parser.exceptions.UnresolvedReferenceException;
import com.linkedin.avroutil1.testcommon.TestUtil;
import com.linkedin.avroutil1.writer.avsc.AvscSchemaWriter;
import java.io.IOException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -688,6 +689,40 @@ public void testSomething() throws Exception {
Assert.assertEquals(recordParseResult.getIssues().size(), 1);
}

@Test
public void testRecursiveReferenceUnionWithDefault() throws Exception {
String recordAvsc = TestUtil.load("schemas/RecursiveRefUnion.avsc");
AvscParser avscParser = new AvscParser();

AvscParseResult recordParseResult = avscParser.parse(recordAvsc);
AvroRecordSchema topLevelSchema = (AvroRecordSchema) recordParseResult.getTopLevelSchema();

// Tests we don't have unparsed default values in for of AvscUnparsedLiteral.
new AvscSchemaWriter().writeSingle(topLevelSchema);

Assert.assertFalse(hasUnparsedDefault(topLevelSchema));
Assert.assertEquals(recordParseResult.getIssues().size(), 0);
}

/**
* Recursively goes through all fields and their schemas to check if any field has an unparsed default value
* @param schema
* @return
*/
private boolean hasUnparsedDefault(AvroRecordSchema schema) {
for(AvroSchemaField field : schema.getFields()) {
if (field.getDefaultValue() instanceof AvscUnparsedLiteral) {
return true;
}
if (field.getSchema().type() == AvroType.RECORD) {
if (hasUnparsedDefault((AvroRecordSchema) field.getSchema())) {
return true;
}
}
}
return false;
}

private Schema vanillaParse(String resource) throws Exception {
String avsc = TestUtil.load(resource);
Schema.Parser vanillaParser = new Schema.Parser();
Expand Down
89 changes: 89 additions & 0 deletions parser/src/test/resources/schemas/RecursiveRefUnion.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{
"type": "record",
"name": "RecursiveRefUnion",
"namespace": "com.test",
"doc": "Top record",
"fields": [
{
"name": "innerField2",
"type": [
"null",
{
"type": "record",
"name": "NestedType2",
"doc": "",
"fields": [
{
"name": "innerField3",
"type": {
"type": "record",
"name": "NestedType3",
"fields": [
{
"name": "innerField4",
"type": [
"null",
"NestedType2"
],
"doc": "",
"default": null
},
{
"name": "innerField5",
"type": {
"type": "record",
"name": "NestedType4",
"fields": [
{
"name": "innerField6",
"type": [
"null",
"NestedType2"
],
"doc": "!!!!",
"default": null
},
{
"name": "innerField7",
"type": [
"null",
{
"type": "record",
"name": "NestedType5",
"doc": "",
"fields": [
{
"name": "field",
"type": "string"
}
]
}
],
"doc": "",
"default": null
}
]
},
"doc": "."
},
{
"name": "innerField8",
"type": [
"null",
"NestedType5"
],
"doc": "!!!!",
"default": null
}
]
},
"doc": "."
}
]
}
],
"doc": "",
"default": null
}
]
}

0 comments on commit 2de30d1

Please sign in to comment.