Skip to content

Latest commit

 

History

History
152 lines (137 loc) · 7.92 KB

File metadata and controls

152 lines (137 loc) · 7.92 KB

Kafka Receiver

Status
Stability beta: metrics, logs, traces
Distributions core, contrib
Issues Open issues Closed issues
Code Owners @pavolloffay, @MovieStoreGuy

Kafka receiver receives traces, metrics, and logs from Kafka. Message payload encoding is configurable.

Note that metrics and logs only support OTLP.

Getting Started

The following settings are required:

  • protocol_version (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers
  • resolve_canonical_bootstrap_servers_only (default = false): Whether to resolve then reverse-lookup broker IPs during startup
  • topic (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from. Only one telemetry type may be used for a given topic.
  • encoding (default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
    • otlp_proto: the payload is deserialized to ExportTraceServiceRequest, ExportLogsServiceRequest or ExportMetricsServiceRequest respectively.
    • jaeger_proto: the payload is deserialized to a single Jaeger proto Span.
    • jaeger_json: the payload is deserialized to a single Jaeger JSON Span using jsonpb.
    • zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.
    • zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.
    • zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.
    • raw: (logs only) the payload's bytes are inserted as the body of a log record.
    • text: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use text_<ENCODING>, like text_utf-8, text_shift_jis, etc., to customize this behavior.
    • json: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
    • azure_resource_logs: (logs only) the payload is converted from Azure Resource Logs format to OTel format.
  • group_id (default = otel-collector): The consumer group that receiver will be consuming messages from
  • client_id (default = otel-collector): The consumer client ID that receiver will use
  • initial_offset (default = latest): The initial offset to use if no offset was previously committed. Must be latest or earliest.
  • session_timeout (default = 10s): The request timeout for detecting client failures when using Kafka’s group management facilities.
  • heartbeat_interval (default = 3s): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
  • auth
    • plain_text
      • username: The username to use.
      • password: The password to use
    • sasl
      • username: The username to use.
      • password: The password to use
      • mechanism: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN)
      • aws_msk.region: AWS Region in case of AWS_MSK_IAM mechanism
      • aws_msk.broker_addr: MSK Broker address in case of AWS_MSK_IAM mechanism
    • tls
      • ca_file: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to false.
      • cert_file: path to the TLS cert to use for TLS required connections. Should only be used if insecure is set to false.
      • key_file: path to the TLS key to use for TLS required connections. Should only be used if insecure is set to false.
      • insecure (default = false): Disable verifying the server's certificate chain and host name (InsecureSkipVerify in the tls config)
      • server_name_override: ServerName indicates the name of the server requested by the client in order to support virtual hosting.
    • kerberos
      • service_name: Kerberos service name
      • realm: Kerberos realm
      • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
      • username: The Kerberos username used for authenticate with KDC
      • password: The Kerberos password used for authenticate with KDC
      • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
      • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
      • disable_fast_negotiation: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to false by default.
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
    • retry
      • max (default = 3): The number of retries to get metadata
      • backoff (default = 250ms): How long to wait between metadata retries
  • autocommit
    • enable: (default = true) Whether or not to auto-commit updated offsets back to the broker
    • interval: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
  • message_marking:
    • after: (default = false) If true, the messages are marked after the pipeline execution
    • on_error: (default = false) If false, only the successfully processed messages are marked Note: this can block the entire partition in case a message processing returns a permanent error
  • header_extraction:
    • extract_headers (default = false): Allows user to attach header fields to resource attributes in otel piepline
    • headers (default = []): List of headers they'd like to extract from kafka record. Note: Matching pattern will be exact. Regexes are not supported as of now.

Example:

receivers:
  kafka:
    protocol_version: 2.0.0

Example of connecting to kafka using sasl and TLS:

receivers:
  kafka:
    auth:
      sasl:
        username: "user"
        password: "secret"
        mechanism: "SCRAM-SHA-512"
      tls:
        insecure: false

Example of header extraction:

receivers:
  kafka:
    topic: test
    header_extraction: 
      extract_headers: true
      headers: ["header1", "header2"]
  • If we feed following kafka record to test topic and use above configs:
{
  event: Hello,
  headers: {
    header1: value1,
    header2: value2,
  }
}

we will get a log record in collector similar to:

{
  ...
  body: Hello,
  resource: {
    kafka.header.header1: value1,
    kafka.header.header2: value2,
  },
  ...
}
  • Here you can see the kafka record header header1 and header2 being added to resource attribute.
  • Every matching kafka header key is prefixed with kafka.header string and attached to resource attributes.