Skip to content

SinksKafka

David Parton edited this page Apr 4, 2018 · 2 revisions

The kafka sink accepts only Raw events. Under the hood, the kafka sink uses the rust-rdkafka crate which is a Rust binding for librdkafka, the Apache Kafka C/C++ client. Librdkafka internally buffers messages as they arrive and flushes them in batches in accordance with extensive configuration options.

Under anticipated operating conditions, this sink guarantees at least once delivery.

Configuration

The following configuration options are available for kafka sinks:

  • enabled - Whether or not the sink is enabled. Optional, defaults to true.
  • topic - The kafka topic in which to publish. Required. If this is not provided, the sink is treated as disabled.
  • brokers - A comma-separated list of kafka brokers to connect to. Internally, this is converted into the librdkafka setting bootstrap.servers. Required. If this is not provided, the sink is treated as disabled.
  • flush_interval - Frequency (in seconds) the sink will wait for librdkafka to flush all inflight messages to kafka. Note that if librdkafka takes longer than this interval, the sink will block. Optional, default is 1 second.
  • max_message_bytes - The maximum amount of bytes that messages can be consuming in memory before the sink valve closes. At the completion of each flush_interval, this is reduced back to zero and the valve can reopen. Optional, default = 10Mb.

In addition, a librdkafka toml table can be specified for fine-tuning the underlying librdkafka client. Again, see the extensive configuration options available.

Example

[sinks]
  [sinks.kafka.neat]
    topic = "myevents"
    brokers = "my-cool-kafka.broker.tld:9092,your-cool-kafka.broker.tld:9092"
    [sinks.kafka.neat.librdkafka]
    "socket.timeout.ms" = 15000
    "socket.blocking.max.ms" = 250

Internal Telemetry

  • cernan.sinks.kafka.publish.success - Total records published successfully.
  • cernan.sinks.kafka.publish.retry - Total records that failed to be published and were retried.
  • cernan.sinks.kafka.publish.failure - Total records that failed to be published and could not be retried.
  • cernan.sinks.kafka.publish.retry.failure - Total records that failed to be retried. This occurs when the error signal from librdkafka does not include the original message.