Skip to content

Allow headers in NATS Jetstream sink #23509

@benjamin-awd

Description

@benjamin-awd

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

NATS Jetstream has the ability to deduplicate messages, based on a Nats-Msg-Id header within a configurable time window (set at the stream level).

Setting this (or any other headers) is not currently supported inside of the Jetstream sink. The full list of headers can be found here: https://docs.nats.io/nats-concepts/jetstream/headers

It's nice to allow this header to be set, since it allows at-least-once or exactly-once delivery

The primary use case is to prevent duplicate data from being processed by downstream consumers when a source retries sending events. For example, if a Vector agent is restarted and re-reads a log file from the beginning, we want JetStream to discard events that have already been successfully processed.

To achieve this, the Nats-Msg-Id must be set to a unique identifier derived from the event's content (e.g., a transaction ID, a unique log entry hash, or a combination of fields). Currently, there is no way to control this header from within Vector's configuration.

Attempted Solutions

The current nats sink can publish to JetStream streams successfully. However, there are no configuration options to set the Nats-Msg-Id header dynamically.

Without this feature, the NATS client library assigns its own unique ID to each message upon publishing. This means that if the same event is sent twice by Vector, it will be treated as two different messages by JetStream, leading to data duplication. There is no existing workaround within Vector's configuration to control this behavior for content-based deduplication.

Proposal

Support for headers could be done by adding a new configuration option called headers. This option, named message_id, would accept a template string.

This allows users to dynamically construct the Nats-Msg-Id header from the event data itself.
Here is an example of the proposed configuration:

sinks:
    my_jetstream_sink:
        type: nats
        inputs: [ "my_source" ]
        endpoint: "nats://localhost:4222"
        subject: "my.subject"
        jetstream:  # jetstream: true will still be supported for backwards compatability  
            enabled: true
            headers:
                message_id: "{{ event_id }}"

With this configuration, Vector would render the template for each event and set the Nats-Msg-Id header before publishing the message. If an event like {"id": "123", "message": "hello world"} is processed, the resulting NATS message would have the header Nats-Msg-Id: 123.

This small addition would unlock a major feature of JetStream and significantly improve the reliability of data pipelines using Vector and NATS.

References

No response

Version

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    type: featureA value-adding code addition that introduce new functionality.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions