Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement schema resolution #114

Closed
burdiyan opened this issue Jun 7, 2018 · 3 comments
Closed

Implement schema resolution #114

burdiyan opened this issue Jun 7, 2018 · 3 comments

Comments

@burdiyan
Copy link

burdiyan commented Jun 7, 2018

Currently there's no way to pass writer's schema for a given codec with reader's schema.

Imagine a codec for the following schema (A):

{
  "type": "record",
  "name": "Person",
  "fields": [
	{
		"type": "string",
		"name": "name"
	}
  ]
}

This is our reader's schema.

And imagine an Avro record like this:

{
	"name": "Foo",
	"deprecated_field": "Bar"
}

This record was written with the corresponding writer's schema (B) that has the deprecated field present. Our current reader has access to this schema through the schema registry (or else).

So, AFAIK currently goavro would fail if codec A would try to read the record written with codec B, even though according to Avro schema evolution rules schema A is backward compatible with schema B, the field just needs to be omitted.

So I believe to make it work, a user must be able to pass the writer's schema to the reader's codec somehow, and schema resolution rules should be applied.

Is this something that is on the roadmap?

@karrick
Copy link
Contributor

karrick commented Aug 22, 2018

Avro schemas ought to be backwards compatible, meaning that a reader using v2 of the schema ought to able to read anything written with v1 of the schema. Not the other way around. Similarly, a reader using v3 of the schema ought to be able to read anything written with either v1 or v2 of the schema. In your example above, the writer appears to be using a newer version of the schema than the reader, which is not necessarily backwards compatible.

On that part we ought all agree. The next part of my answer may make you think twice about evolving Avro schemas. I've answered this before on another issue, I'm fairly certain, but I'll try to capture it here also.

The Avro specification for schema evolution with optional fields is broken. Put another way, the way the Avro specification is written, there is no way for an Avro decoder to make use of the default value for a record field.

There. I said it. To understand this fully, one has to really read several parts of the Avro specification over and over until it sinks in. I thought this was merely my interpretation of the specification, but back when I worked on goavro much more, I showed this issue to several talented engineers, and the consensus was to not try to evolve your Avro schemas with optional fields, because the Avro specification fails to identify how a decoder, when presented with multiple optional fields coming in can determine which of the optional fields is present.

Consider the following version 1 of a hypothetical schema:

{ "type": "record",
   "name": "Person",
   "fields": [
        { "type": "string", "name": "name" },
        { "type": "string", "name": "phone" }
   ]
}

The following assumption is listen in the Avro specification, and it's invalid if the schema has evolved.

https://avro.apache.org/docs/1.8.2/spec.html#Data+Serialization

Avro data is always serialized with its schema. Files that store Avro data should always also include the schema for that data in the same file. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data.
Because the schema used to write data is always available when the data is read, Avro data itself is not tagged with type information. The schema is required to parse data.
In general, both serialization and deserialization proceed as a depth-first, left-to-right traversal of the schema, serializing primitive types as they are encountered.

The above assumption, starting with "Because the schema," states why the Avro specification designers omitted tags in the encoded data. It simply was not needed. The first field of this record is always the name, and the second is the phone number. It says so right up there at the top of the data blob where the schema is. The schema always goes with the encoded data.

Now, envision a schema registry, and a schema that evolves. Now in addition to the name and phone, we want to store a mailing address, which will be a simple string.

Version 2 of our hypothetical schema:

{ "type": "record",
   "name": "Person",
   "fields": [
        { "type": "string", "name": "name" },
        { "type": "string", "name": "phone" },
        { "type": "string", "name": "address", "default": "" }
   ]
}

We even use a default declaration for the address field, because the Avro specification states it is a default value for this field, used when reading instances that lack this field. In other words, a reader using v2 of the schema should be able to use the empty string for the default value of the address field for a record written using v1 of the schema.

I said "should be able to" but sadly it cannot. There is no field tagging in encoded Avro schemas.

A record is encoded by encoding the values of its fields in the order that they are declared. In other words, a record is encoded as just the concatenation of the encodings of its fields. Field values are encoded per their schema.

The v1 decoder will only decode two strings, one for name and one for phone number, along with their lengths. A v2 decoder will decode the name, then the phone number, and then it tries to decode the address. That decode first reads bytes to determine the length of the address string, then assuming those bytes exist in the buffer, and assuming they decode to a positive number, then that number of bytes are pulled out of the buffer and put into the string. The v2 decoder does not know whether record was written with v1, v2, or v3. That ability was overlooked in Avro specification, because the specification clearly states that the schema must always accompany the data to be decoded:

Because the schema used to write data is always available when the data is read, Avro data itself is not tagged with type information. The schema is required to parse data.

So, my recommendation is to feel free to evolve your schemas, but you still need to build a decoder from the schema used to encode the Avro blobs. We use a schema registry at my work, and when a schema is updated, then it gets a new digest, and that schema blob is available at the registry by concatenating that digest to an URL stub. When we pull data off the wire, a prefixed fixed number of bytes is the schema digest. We strip off the prefix, check a map[string]*goavro.Codec to see whether we have a codec for that message. If not, we download it from the schema registry, build a new codec from the JSON schema, then use that to decode the message blob. All of this happens for a particular schema and its line of descendants. As long as the schema evolves sensibly, those schema changes do not affect our program reading the data. It still pulls the same data out of the same fields it used to. If we needed to deprecate a field, we'd update the schema, update our writers to write to the deprecated and new fields, then add code to the reader to simply refer to the new fields last.

@karrick karrick closed this as completed Aug 22, 2018
@mikepk
Copy link

mikepk commented Dec 5, 2018

Hi @karrick, I think you may be misunderstanding the intent of the schema evolution mechanism in the Avro spec. I've written a decoder as well for python (https://github.com/pluralsight/spavro) and implemented schema evolution so I know it's possible :).

What you're highlighting is that the V2 decoder doesn't know whether the record is V1 or V2 so it can't decide how to proceed with the optional field because of the binary encoding. That's assuming you would have a mix of V1 and V2 records in an avro encoding. What you need is a "compatible" decoder that handles V1->V2 which isn't a pure V2 decoder.

Disambiguating optional fields or avro encodings is not really what the spec is for. As you point out the write schema must always be present with the record, so you always know the version of the record. So what's the point of the schema evolution? The idea is that you have a read schema that can be used with records from a compatible write schema, but all of the records are written with that write schema. So you know the version of the records, no guessing! The idea is that you can create records that conform to a newer version of the schema, even though all of the records you're reading are a previous version. What you end up with is a new decoder that's not a straight V2 decoder, it's really a decoder that reads V1 records specifically and outputs V2 records.

The intent of schema evolution is to have a separate codec as a combination of a read schema and a write schema. Normally you would have a latest read schema C and a series of previous write schemas A, B, and lastly C. You can create a single "view" of your data by reading avro written with the A schema with a codec (C, A), avro written with B with a codec (C, B) and avro written with C with just a simple codec (C). You could then create a single data set that works with your processing but use older records that had compatible schemas. The records that didn't exist previously would all have the default value. So again you know where the schemas mismatch so the codec just inserts the optional field where it should be because it knows that the data won't be there with the previous write schemas.

This becomes important in things like Kafka applications using Avro where every record is tagged with it's write schema. Again, the key is you know the schema the record is written in, so there's no ambiguity there. The compatibility schema knows how to handle the evolution cases. I can point you to examples in Python that might make it clearer.

@mikepk
Copy link

mikepk commented Dec 5, 2018

Here's an example... I'm leaving out a bunch of the details but the gist / pattern is here. Three schemas, compatible, A, B and C. Write out some binary records for each. Then read each set back as schema C.

schema_a = '''
{
  "type": "record",
  "name": "Person",
  "fields": [
    {
        "type": "string",
        "name": "name"
    },
    {
        "type": "string",
        "name": "deprecated_field"
    }
  ]
}'''

schema_b = '''
{
  "type": "record",
  "name": "Person",
  "fields": [
    {
        "type": "string",
        "name": "name"
    }
  ]
}'''

schema_c = '''
{
  "type": "record",
  "name": "Person",
  "fields": [
    {
        "type": "string",
        "name": "new_field",
        "default": "A default string"
    },
    {
        "type": "string",
        "name": "name"
    },
    {
        "type": ["null", "string"],
        "name": "new_field_2_optional",
        "default": null
    }
  ]
}'''

Some sample records for each schema:

sample_records_a = [{'deprecated_field': 'Deprecated 1', 'name': 'Alice'},
                    {'deprecated_field': 'Deprecated 2', 'name': 'Bob'},
                    {'deprecated_field': 'Deprecated 3', 'name': 'Charlie'}]


sample_records_b = [{'name': 'Doug'}, {'name': 'Eugene'}, {'name': 'Frank'}]


sample_records_c = [{'name': 'Ginny', 'new_field': 'optional 1'},
                    {'name': 'Ignacio', 'new_field': 'optional 2'},
                    {'name': 'Helen',
                     'new_field': 'optional 3',
                     'new_field_2_optional': 'The other optional field!'}]

skipping some stuff

Now make some binary serialized avro records for each schema:

schema_a_writer = get_avro_binary_writer(schema_a)
schema_b_writer = get_avro_binary_writer(schema_b)
schema_c_writer = get_avro_binary_writer(schema_c)

a_binary_data = [schema_a_writer(record) for record in sample_records_a]
b_binary_data = [schema_b_writer(record) for record in sample_records_b]
c_binary_data = [schema_c_writer(record) for record in sample_records_c]

Then make a resolved reader for each permutation:

# called with writerschema, readerschema as the arguments, reader schema is optional
schema_c_a_reader = get_avro_reader(schema_a, schema_c)
schema_c_b_reader = get_avro_reader(schema_b, schema_c)
schema_c_reader = get_avro_reader(schema_c)

# all of these records will be output with schema C
for binary_data in a_binary_data:
    print(schema_c_a_reader(binary_data))

for binary_data in b_binary_data:
    print(schema_c_b_reader(binary_data))

for binary_data in c_binary_data:
    print(schema_c_reader(binary_data))

The output is:

{'name': 'Alice', 'new_field': 'A default string', 'new_field_2_optional': None}
{'name': 'Bob', 'new_field': 'A default string', 'new_field_2_optional': None}
{'name': 'Charlie', 'new_field': 'A default string', 'new_field_2_optional': None}
{'name': 'Doug', 'new_field': 'A default string', 'new_field_2_optional': None}
{'name': 'Eugene', 'new_field': 'A default string', 'new_field_2_optional': None}
{'name': 'Frank', 'new_field': 'A default string', 'new_field_2_optional': None}
{'new_field': 'optional 1', 'name': 'Ginny', 'new_field_2_optional': None}
{'new_field': 'optional 2', 'name': 'Ignacio', 'new_field_2_optional': None}
{'new_field': 'optional 3', 'name': 'Helen', 'new_field_2_optional': 'The other optional field!'}

You can see this is reading all of the old records from schema A and B but outputting them to conform to schema C.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants