Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interoperability with confluent kafka + schema registry + container header #22

Closed
cromestant opened this issue Dec 15, 2015 · 11 comments
Closed
Labels

Comments

@cromestant
Copy link

Hello, I'm working on using a confluent kafka deployment, and using avsc to be the publisher in many cases. However I am running into some problems when trying to use schema validation and the schema registry.
I am wondering if I have not found the correct way to do this in avsc or if it is not yet compatible, but in essence, if you look at this
You will see that along with the serialized data they include in the payload the schema, so that the client can then query the schema registry for the latest version of the schema to deserialize.

also looking at this it mentions that:

When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.

If I run using the java samples included in the distribution, I get no problems, however trying to serialize with AVSC I only get the serialized payload, and not the schema.(thus getting magic byte missing error)

Is this something avsc is compatible with? or , is there a way to have avsc include the schema in the header as described here and here

in advance, thank you for your reply

@cromestant cromestant changed the title Interoperability with confluent kafka + schema registry Interoperability with confluent kafka + schema registry + container header Dec 15, 2015
@mtth
Copy link
Owner

mtth commented Dec 16, 2015

I'm not sure I follow what you're asking.

You can generate container files using a BlockEncoder. This will generate the appropriate magic bytes and header. I'm unsure how this will help for your use-case though, since including this in each message will significantly increase message size.

How are you talking to Kafka? Are you using the REST proxy?

@cromestant
Copy link
Author

Thanks for your reply,
For talking to kafka I'm using kafka-node , this is not using the rest proxy If i'm understanding correctly but direct TCP protocol.
It does look like the BlockEncoder can help, I'll try it.
Im still trying to grasp all of the intricacies of using Confluent with avro , as it seems that the schema registry is not as automated as first described. Right now I'm only validating a POC in which I want to write from avsc to the confluent distro and be able to decode from the sample java app. right now this is just giving the magic byte problem.
Will test if this helps. The overhead is something I am concerned with, so I will try to omit it in production , so will have to configure other schema inferring mechanism.

@mtth
Copy link
Owner

mtth commented Dec 16, 2015

I see. I'm not familiar with kafka-node but I'll try to take a look to understand what input the library expects.

Edit: Could you share the code you have so far?

@mtth mtth added the question label Dec 16, 2015
@cromestant
Copy link
Author

I might not be explaining myself. The error is on the receiving end. If you
encode/ send/ decode with the Java apps included in confluent it works.
Changing he producer with node app that uses avsc to serialize produces the
error. Have not been able to test yet. Will report back asap

On Wednesday, December 16, 2015, Matthieu Monsch notifications@github.com
wrote:

I see. I'm not familiar with kafka-node but I'll try to take a look to
understand what input the library expects.


Reply to this email directly or view it on GitHub
#22 (comment).

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail
Please think about the environment before you print this e-mail
Por favor piense en el medio ambiente antes de imprimir este e-mail

@gabrielnau
Copy link

Confluent stack will add a header to a single Avro message:

< magic byte > < schema id (4 bytes) > < Avro blob >

You can get an idea of how deserialization works here.

As far as I know, Avsc is only about Avro and if you need to integrate with Confluent's stack, you need to handle yourself the specific header.

So I may be mistaken but I think Avro's "Object Container Files" is another topic.

@cromestant
Copy link
Author

Awesome thanks!

On Monday, December 21, 2015, Gabriel Nau notifications@github.com wrote:

Confluent stack will add a header to a single Avro message:

< magic byte > < schema id (4 bytes) > < Avro blob >

You can get an idea of how deserialization works here
https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L41
.

As far as I know, Avsc is only about Avro and if you need to integrate
with Confluent's stack, you need to handle yourself the specific header.

So I may be mistaken but I think Avro's "Object Container Files" is
another topic.


Reply to this email directly or view it on GitHub
#22 (comment).

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail
Please think about the environment before you print this e-mail
Por favor piense en el medio ambiente antes de imprimir este e-mail

@mtth
Copy link
Owner

mtth commented Dec 22, 2015

Thanks @gabrielnau , this makes sense.

@cromestant - you should then be able to generate a valid message for example as follows:

/**
 * Encode an Avro value into a message, as expected by Confluent's Kafka Avro
 * deserializer.
 *
 * @param val {...} The Avro value to encode.
 * @param type {Type} Your value's Avro type.
 * @param schemaId {Integer} Your schema's ID (inside the registry).
 * @param length {Integer} Optional initial buffer length. Set it high enough
 * to avoid having to resize. Defaults to 1024.
 *
 */
function toMessageBuffer(val, type, schemaId, length) {
  var buf = new Buffer(length || 1024);
  buf[0] = 0; // Magic byte.
  buf.writeInt32BE(schemaId, 1);

  var pos = type.encode(val, buf, 5);
  if (pos < 0) {
    // The buffer was too short, we need to resize.
    return getMessageBuffer(type, val, schemaId, length - pos);
  }
  return buf.slice(0, pos);
}

Sample usage:

var type = avsc.parse('string');
var buf = toMessageBuffer('hello', type, 1); // Assuming 1 is your schema's ID.

@cromestant
Copy link
Author

You guys are awesome. I'll test all of this in January when back from
holidays

On Tuesday, December 22, 2015, Matthieu Monsch notifications@github.com
wrote:

Thanks @gabrielnau https://github.com/gabrielnau , this makes sense.

@cromestant https://github.com/cromestant - you should then be able to
generate a valid message for example as follows:

/** * Encode an Avro value into a message, as expected by Confluent's Kafka Avro * deserializer. * * @param val {...} The Avro value to encode. * @param type {Type} Your value's Avro type. * @param schemaId {Integer} Your schema's ID (inside the registry). * @param length {Integer} Optional initial buffer length. Set it high enough * to avoid having to resize. Defaults to 1024. * */function toMessageBuffer(val, type, schemaId, length) {
var buf = new Buffer(length || 1024);
buf[0] = 0; // Magic byte.
buf.writeInt32BE(schemaId, 1);

var pos = type.encode(val, buf, 5);
if (pos < 0) {
// The buffer was too short, we need to resize.
return getMessageBuffer(type, val, schemaId, length - pos);
} else {
return buf.slice(0, pos);
}
}

Sample usage:

var type = avsc.parse('string');var buf = toMessageBuffer('hello', type, 1); // Assuming 1 is your schema's ID.


Reply to this email directly or view it on GitHub
#22 (comment).

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail
Please think about the environment before you print this e-mail
Por favor piense en el medio ambiente antes de imprimir este e-mail

@zcei
Copy link

zcei commented Jan 18, 2016

Found this while investigating the hassle to switch to Avro, and made me smile, very nice work 👍

Our Java guys are already using Avro and they love it, especially together with the schema registry.
That integration into avsc looks really easy!

Would be nice to here from @cromestant how it worked out, maybe you want to share some insights?

@cromestant
Copy link
Author

Sorry. The truth is that we have not been able to pick it up yet. The
beginning of the new year has left little time for this. Hope to pick it up
again soon

On Monday, January 18, 2016, Stephan Schneider notifications@github.com
wrote:

Found this while investigating the hassle to switch to Avro, and made me
smile, very nice work [image: 👍]

Our Java guys are already using Avro and they love it, especially together
with the schema registry.
That integration into avsc looks really easy!

Would be nice to here from @cromestant https://github.com/cromestant
how it worked out, maybe you want to share some insights?


Reply to this email directly or view it on GitHub
#22 (comment).

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail
Please think about the environment before you print this e-mail
Por favor piense en el medio ambiente antes de imprimir este e-mail

@cromestant
Copy link
Author

I've finally been able to test this, and it works like a charm.
You have to use a keyed message, here is the relevant code using the methodsupplied by @mtth :

var type = avsc.parse('./schemas/charz.avsc')
var pet = {kind: 'CATbert', name: 'my other cat'}
producer.on('ready', function () {

        console.log("Ready to send!");
//        var buf = type.toBuffer(pet); // Serialized object.
        var buf = toMessageBuffer(pet, type, 1)

        var km = new KeyedMessage("event",buf);

       payloads = [
            { topic: 'pets', messages: km },
        ];
        console.log(payloads)
        producer.send(payloads, function (err, data) {
   //     res.send(data);
    });

});

marking this as closed as it is working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants