Skip to content

Kafka Sink: Support encoding Confluent Kafka wire format #19872

@silverwind

Description

@silverwind

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

Confluent Kafka has a proprietary Kafka Wire format that expects a 5-byte header that Vector's encoder currently can not produce.

Vector already recognizes and strips these 5 bytes during Avro decoding as part of decoding.avro.strip_schema_id_prefix, though the header is actually not related to Avro encoding at all and in fact can also appear for Protobuf and JSON too on Confluent Kafka.

So I suggest that the header encoding to be implemented in the Kafka Sink and decoding to be moved to the Kafka Source.

let bytes = if self.strip_schema_id_prefix {
if bytes.len() >= CONFLUENT_SCHEMA_PREFIX_LEN && bytes[0] == CONFLUENT_MAGIC_BYTE {
bytes.slice(CONFLUENT_SCHEMA_PREFIX_LEN..)
} else {
return Err(vector_common::Error::from(
"Expected avro datum to be prefixed with schema id",
));
}
} else {
bytes
};

Proposal

Add two options to the kafka sink' encoding to encode this header:

encoding:
  confluent_magic_byte: 0
  confluent_schema_id: 1

References

#561
#19546

Version

vector 0.35.0 (x86_64-unknown-linux-gnu e57c0c0 2024-01-08 14:42:10.103908779)

Metadata

Metadata

Assignees

No one assigned

    Labels

    domain: codecsAnything related to Vector's codecs (encoding/decoding)source: kafkaAnything `kafka` source relatedtype: featureA value-adding code addition that introduce new functionality.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions