Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

KafkaAvroMessageEncoder re-registers schemas that have already been registered #36

Open
theduderog opened this Issue · 13 comments

5 participants

@theduderog

For my registry, I subclassed MemorySchemaRegistry just like the DummySchemaRegistry in the example code. Also, I'm using the KafkaAvroMessageEncoder class in my producer to create a message in Kafka with the magic byte, schema id, and avro binary message content.

The problem is some logic that occurs in the KafkaAvroMessageEncoder (https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/coders/KafkaAvroMessageEncoder.java#L71). If a schema is not found in cache, it tries to "auto-register" the schema. The problem is that my registry already knows about that schema with a different id. MemorySchemaRegistry will overwrite the existing entry with a new id (https://github.com/linkedin/camus/blob/master/camus-schema-registry/src/main/java/com/linkedin/camus/schemaregistry/MemorySchemaRegistry.java#L32) even if it already knows about the old one.

My questions are:
1) Is this the right logic for KafkaAvroMessageEncoder? Should it instead call a registry.getBySchema(topic, schema) first to see if that schema is already registered?

OR

2) Should the logic in MemorySchemaRegistry.register() be changed to return an existing id if the tuple (topic, schema) is already registered?

One way to solve this issue is to generate schema ids using Avro fingerprints.

Also, I think that KafkaAvroMessageEncoder and KafkaAvroMessageDecoder should be changed to encode the schema id in 64 bits, not 32. The Avro fingerprints and even the incrementing ids generated by the current MemorySchemaRegistry implementation are longs.

@theduderog

One reason to consider migration to long (which I think would mean a different the magic byte) is to use the Avro fingerprint as the schema id. That would simplify the producer logic. No need to query the registry and maintain a schema cache. It can just calculate the id directly from the schema.

I think there's also some benefit to having a stable ID for a given schema, no matter what the backing registry implementation. In particular, it means getting rid of the "you must register your schemas in the same order every time" requirement from the MemorySchemaRegistry implementation.

@kengoodhope
Owner
@mariussoutier

I think the MemorySchemaRegistry is only for illustration purposes. It increments a global id instead of per topic, so its practical use is somewhat limited.

@theduderog

After a bit more digging, I realized why the code is the way it is. If you use the Avro specific API then schema like TrackingMonitoringEvent.avsc get compiled into the jar file. For those types of situations, the Registry.register() method needs to be used to map from the schema to the schema id.

For things to work correctly though, the SchemaRegistry.register() must have the following minimum requirements:
1) Multiple producers can have incomplete views of the registry but must be globally consistent in that the same schema id cannot be mapped to different schema
2) Consumers must have a complete view of the registry (i.e. they must be able to lookup a schema for any give schema id)

Having a DB-backed registry guarantees these properties. The MemorySchemaRegistry implementation makes it tricky to guarantee. I realize that the MemorySchemaRegistry is basically a toy just to get started with Camus but it can cause a bit of confusion.

If we add one more requirement to the SchemaRegistry interface, I think it will simply things. We could add a requirement that SchemaRegistry.register() operation be idempotent. Registering the same schema multiple times would always return the same schema id.

That seems like a pretty simple way to guarantee that all registry implementations work correctly. In fact, if you don't do that, you're likely to end up with a new id in the database every time you launch a producer or consumer that uses a schema that was compiled into the code.

@kengoodhope
Owner

Maybe we just need to add a comment that this implementation is meant for illustration purposes and has this unexpected behavior

@theduderog

That would be helpful. Making it idempotent would be even better.

@herriojr

In regards to incremental IDs vs fingerprinting:

The issue comes down to the protocol not being flexible in terms of specifying the length of the ID, even through it is represented as a String everywhere. It would be much more flexible and consistent with the concept of a String if proceeding the MAGIC_BYTE, it contained the ID length and then the ID instead of just assuming the ID is 4 bytes.

Right now, it is being read from the kafka message as an Integer and then being converted to a String and treated as a String everywhere else. The simplest way of upgrading to the aforementioned proposal is to turn the MAGIC_BYTE into a kind of VERSION_BYTE where 0x0 means it's the old format of (0x0, ID, payload) and 0x1 is the new format (0x1, ID-size, ID, payload). I could create a patch for this in a jiffy if anyone is interested. This would be great for both sides of the argument as they can choose if they want a larger messages with more flexibility or retain the same small size if that's more of a concern.

@theduderog
@nbriggs

I'd suggest that if it is implemented as @herriojr suggests -- (0x1, ID-size, ID, payload) -- that it be treated as a byte array, not a String. For example if you were to use a raw sha-256 hash as the ID it could easily not be a valid UTF-8/UTF-16 byte sequence.

@theduderog

Thanks, Nick. The reason for it being a string is to match the SchemaRegistry interface, which uses String to represent schema id.

@nbriggs

Ah... right. Makes sense. So anything such as the sha-256 hash example would need to be represented in a "printable" format.

@herriojr

I actually did this implementation for myself if you guys want me to submit a pull request.

@kengoodhope
Owner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.