Skip to content

writer.WriteMessages(...) method generates "Denied Operation = Describe" events on non-related topics #963

@chinniehendrix

Description

@chinniehendrix

Describe the bug

When producing keyed messages to a specific topic using the writer.WriteMessages(...) method with mutual TLS and ACLs enabled, the client tries to access other topics (for which it does not have permissions) generating lots of "Denied Operation = Describe" events. This behavior, when shared simultaneously by hundreds of clients, can have a significant impact on the performance of the Kafka cluster.

Kafka Version

  • What version(s) of Kafka are you testing against?
    v2.4.1
  • What version of kafka-go are you using?
    v0.4.33

To Reproduce

All resources can be found in this public repository: https://github.com/chinniehendrix/local-kafka-poc/tree/feat/demo-kafka-go-describe-issue/strimzi/tls-clients

package main

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"time"

	log "github.com/sirupsen/logrus"

	"github.com/segmentio/kafka-go"
)

func tlsConfig() *tls.Config {
	userCert, errCert := ioutil.ReadFile("/client-certs/user.crt")
	if errCert != nil {
		log.Fatal(errCert)
	}

	userKey, errKey := ioutil.ReadFile("/client-certs/user.key")
	if errKey != nil {
		log.Fatal(errKey)
	}

	caPEM, errCA := ioutil.ReadFile("/ca-certs/ca.crt")
	if errCA != nil {
		log.Fatal(errCA)
	}

	cert, err := tls.X509KeyPair(userCert, userKey)
	if err != nil {
		log.Fatal(err.Error())
	}

	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM([]byte(caPEM))

	config := &tls.Config{
		MinVersion:   tls.VersionTLS12,
		Certificates: []tls.Certificate{cert},
		RootCAs:      caCertPool,
	}

	fmt.Println("Returning TLS config")
	return config
}

func main() {
	now := time.Now()
	fmt.Println("Go client started")
	fmt.Println(now)
	logger := log.New()

	w := &kafka.Writer{
		Addr:        kafka.TCP("my-cluster-kafka-brokers:9093"),
		Topic:       "my-topic",
		Compression: kafka.Lz4,
		Transport: &kafka.Transport{
			TLS: tlsConfig(),
		},
		//Balancer: &kafka.Hash{},
		Logger: logger,
	}

	for i := 0; i < 1; i++ {
		key := fmt.Sprintf("Key-%d", i)

		kafkaMessage := &kafka.Message{
			Key:   []byte(key),
			Value: []byte(fmt.Sprintf("This is message %d", i)),
		}

		err := w.WriteMessages(context.Background(), *kafkaMessage)
		if err != nil {
			fmt.Println(err)
		} else {
			fmt.Println("produced", key)
		}

	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	} else {
		log.Info("Successfully closed writer")
	}

	time.Sleep(3600 * time.Second)
}

Expected Behavior

The client should produce one message to topic "my-topic" and close the connection.

Observed Behavior

The client produces one message to topic "my-topic" and tries to access topic "my-topic-02" to which it does not have permissions, repeatedly causing "Denied Operation = Describe" events on the Kafka cluster (as shown below).

...
my-cluster-kafka-0 kafka 2022-08-02 13:10:18,014 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:__consumer_offsets (kafka.authorizer.logger) [data-plane-kafka-request-handler-7]
my-cluster-kafka-0 kafka 2022-08-02 13:10:18,015 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:my-topic-02 (kafka.authorizer.logger) [data-plane-kafka-request-handler-7]
my-cluster-kafka-0 kafka 2022-08-02 13:10:18,715 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:__consumer_offsets (kafka.authorizer.logger) [data-plane-kafka-request-handler-6]
my-cluster-kafka-0 kafka 2022-08-02 13:10:18,715 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:my-topic-02 (kafka.authorizer.logger) [data-plane-kafka-request-handler-6]
my-cluster-kafka-0 kafka 2022-08-02 13:10:21,331 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:__consumer_offsets (kafka.authorizer.logger) [data-plane-kafka-request-handler-4]
my-cluster-kafka-0 kafka 2022-08-02 13:10:21,331 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:my-topic-02 (kafka.authorizer.logger) [data-plane-kafka-request-handler-4]
my-cluster-kafka-0 kafka 2022-08-02 13:10:25,922 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:__consumer_offsets (kafka.authorizer.logger) [data-plane-kafka-request-handler-6]
my-cluster-kafka-0 kafka 2022-08-02 13:10:25,922 INFO Principal = User:CN=test-producer-01 is Denied Operation = Describe from host = 172.17.0.8 on resource = Topic:LITERAL:my-topic-02 (kafka.authorizer.logger) [data-plane-kafka-request-handler-6]

Additional Context

The described behavior does NOT occur when using kafka.Conn.WriteMessages(...) with a Conn obtained via the DialLeader API.

	conn, err := dialer.DialLeader(context.Background(), "tcp", "my-cluster-kafka-brokers:9093", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

	for i := 0; i < 1000; i++ {
		key := fmt.Sprintf("Key-%d", i)

		kafkaMessage := &kafka.Message{
			Key:   []byte(key),
			Value: []byte(fmt.Sprintf("This is message %d", i)),
		}

		_, err := conn.WriteMessages(*kafkaMessage)

		if err != nil {
			fmt.Println(err)
		} else {
			fmt.Println("produced", key)
		}

	}

	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}

Also, this behavior was not (yet) reproduced on any other version of Kafka than v2.4.1

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions