Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] MirrorMaker does not use the consumer bootstrapServer value specified in config.yaml #3949

Closed
uliludmann opened this issue Nov 12, 2020 · 5 comments

Comments

@uliludmann
Copy link

uliludmann commented Nov 12, 2020

Describe the bug
Mirror Maker does not use the consumer bootstrapServer value specified in its config.yaml

To Reproduce

  1. use this config.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
  namespace: kafka
spec:
  version: 2.6.0
  replicas: 1
  consumer:
    bootstrapServers: my-bootstrap-server:31090
    groupId: my-grpupID
    tls:
      trustedCertificates:
      - secretName: mirrormakersecret
        certificate: tls.crt
    authentication:
      type: plain
      username: myuser
      passwordSecret: 
        secretName: myuserpassword
        password: data
    config:
      # max.partition.fetch.bytes: Dont set this 
      # max.request.size: 4194304
      # fetch.max.bytes: 209715200
      # max.poll.records: 2000

      ## trackunit config
      security.protocol: "SASL_SSL"
      exclude.internal.topics: "true"
      allow.auto.create.topics: "true"
      client.id: "mirror_maker_consumer"
      mirror.topics.whitelist: "mytopic"
      auto.offset.reset: "earliest"

      
  producer:
    bootstrapServers: producer-cluster:9092
    config:
      acks: 1
      # batch.size: 16384
      buffer.memory: 67108864 #2147483648
      compression.type: gzip
      #linger.ms: 15000
      #max.request.size: 4194304
  whitelist: ".*"

kubectl apply -f config.yaml

see the logs of the pod where consumer bootstrap server list is empty:

INFO ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [] <--- this is empty and should not be
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
....

Note that producer bootstrap server is filled correctly

Expected behavior
bootstrap server list should be filled with the corresponding server from the config.yaml

Environment (please complete the following information):

  • Strimzi version: 0.20
  • Installation method: Helm chart
  • Kubernetes cluster: Kubernetes 1.18.6
  • Infrastructure: Azure Kubernetes Service
@uliludmann uliludmann added the bug label Nov 12, 2020
@scholzj
Copy link
Member

scholzj commented Nov 12, 2020

Can you share the whole log from the Kafka Mirror Maker pod? It should on the beginning print the configuration we pass to Mirror Maker I think. That should help to investigate it.

@scholzj
Copy link
Member

scholzj commented Nov 12, 2020

FYI: I do not have any Kafka cluster with SASL PLAIN. But it does not happen to me with SCRAM-SHA-512. So I think the logs from you will be critical to address this.

@uliludmann
Copy link
Author

uliludmann commented Nov 13, 2020

Hi Jakub,

thanks for your fast reply.

This is the output from the creation logs

2020-11-13 06:41:54,580 INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser) [main] 2020-11-13 06:41:54,580 INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser) [main] 2020-11-13 06:41:54,580 INFO Kafka startTimeMs: 1605249714573 (org.apache.kafka.common.utils.AppInfoParser) [main] 2020-11-13 06:41:54,594 INFO ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = null-0 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = null group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig) [main] 2020-11-13 06:41:54,601 ERROR Exception when starting mirror maker. (kafka.tools.MirrorMaker$) [main]

I changed the authentication type to scram-sha-512 just in case.. but same problem here

@scholzj
Copy link
Member

scholzj commented Nov 13, 2020

Sorry, but that is not the whole log. On the beginning, when the container starts, our scripts print information about how they configure the Mirror Maker and that is the interesting part. You should see there something like this:

Preparing truststore
Certificate was added to keystore
Preparing truststore is complete
Kafka Mirror Maker consumer configuration:
# Bootstrap servers
bootstrap.servers=my-source-cluster-kafka-bootstrap:9094
# Consumer group
group.id=my-grpupID
# Provided configuration
allow.auto.create.topics=true
auto.offset.reset=earliest
client.id=mirror_maker_consumer
exclude.internal.topics=true
mirror.topics.whitelist=mytopic
security.protocol=SASL_SSL
# TLS / SSL
ssl.truststore.location=/tmp/kafka/consumer.truststore.p12
ssl.truststore.password=[hidden]
ssl.truststore.type=PKCS12
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=[hidden]
Kafka Mirror Maker producer configuration:
# Bootstrap servers
bootstrap.servers=my-target-cluster-kafka-bootstrap:9092
# Provided configuration
acks=1
buffer.memory=67108864
compression.type=gzip
security.protocol=PLAINTEXT
2020-11-12 20:42:55,906 INFO Starting readiness poller (io.strimzi.mirrormaker.agent.MirrorMakerAgent) [main]
2020-11-12 20:42:56,025 INFO Starting liveness poller (io.strimzi.mirrormaker.agent.MirrorMakerAgent) [main]
2020-11-12 20:42:56,212 INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [main]
2020-11-12 20:42:56,498 INFO Starting mirror maker (kafka.tools.MirrorMaker$) [main]
WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'
2020-11-12 20:42:56,561 INFO Property acks is overridden to 1 - data loss or message reordering is possible. (kafka.tools.MirrorMaker$) [main]
2020-11-12 20:42:56,582 INFO ProducerConfig values:
	acks = 1
	batch.size = 16384
	bootstrap.servers = [my-target-cluster-kafka-bootstrap:9092]
	buffer.memory = 67108864
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = gzip
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 2147483647
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = false
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 9223372036854775807
	max.in.flight.requests.per.connection = 1
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig) [main]
2020-11-12 20:42:56,652 INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-11-12 20:42:56,653 INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-11-12 20:42:56,653 INFO Kafka startTimeMs: 1605213776646 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-11-12 20:42:56,666 INFO ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [my-source-cluster-kafka-bootstrap:9094]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = my-grpupID-0
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = my-grpupID
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = SCRAM-SHA-512
	security.protocol = SASL_SSL
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = /tmp/kafka/consumer.truststore.p12
	ssl.truststore.password = [hidden]
	ssl.truststore.type = PKCS12
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig) [main]
2020-11-12 20:42:56,748 INFO Successfully logged in. (org.apache.kafka.common.security.authenticator.AbstractLogin) [main]
2020-11-12 20:42:56,981 INFO [Producer clientId=producer-1] Cluster ID: Hlq8dN5ESN6i-G6TRSLQJw (org.apache.kafka.clients.Metadata) [kafka-producer-network-thread | producer-1]
2020-11-12 20:42:57,215 WARN The configuration 'mirror.topics.whitelist' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) [main]
2020-11-12 20:42:57,216 INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-11-12 20:42:57,216 INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-11-12 20:42:57,216 INFO Kafka startTimeMs: 1605213777216 (org.apache.kafka.common.utils.AppInfoParser) [main]
2020-11-12 20:42:57,223 INFO [mirrormaker-thread-0] Starting mirror maker thread mirrormaker-thread-0 (kafka.tools.MirrorMaker$MirrorMakerThread) [mirrormaker-thread-0]
2020-11-12 20:42:57,224 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Subscribed to pattern: '.*' (org.apache.kafka.clients.consumer.KafkaConsumer) [mirrormaker-thread-0]
2020-11-12 20:42:57,471 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Cluster ID: SYF2-yFNQsaJ1e_TYHBrxw (org.apache.kafka.clients.Metadata) [mirrormaker-thread-0]
2020-11-12 20:42:57,472 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Discovered group coordinator my-source-cluster-kafka-0.my-source-cluster-kafka-brokers.myproject.svc:9094 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:42:57,476 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:42:57,654 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:42:57,654 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:42:57,903 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Successfully joined group with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:42:57,904 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Notifying assignor about the new Assignment(partitions=[]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [mirrormaker-thread-0]
2020-11-12 20:42:57,904 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [mirrormaker-thread-0]
2020-11-12 20:43:57,926 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Attempt to heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:43:57,929 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Revoke previously assigned partitions  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [mirrormaker-thread-0]
2020-11-12 20:43:57,929 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:43:57,934 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Finished assignment for group at generation 5: {my-grpupID-0-f36013b1-2a57-4761-9f1e-7511223ab3eb=Assignment(partitions=[])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [mirrormaker-thread-0]
2020-11-12 20:43:57,937 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Successfully joined group with generation 5 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [mirrormaker-thread-0]
2020-11-12 20:43:57,938 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Notifying assignor about the new Assignment(partitions=[]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [mirrormaker-thread-0]
2020-11-12 20:43:57,938 INFO [Consumer clientId=my-grpupID-0, groupId=my-grpupID] Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [mirrormaker-thread-0]

@uliludmann
Copy link
Author

uliludmann commented Nov 13, 2020

Hello Jakub,

thanks for your help! I was now able to figure out what the problem was.
I configured the password secret wrong.

Root cause of the problem was, that i encoded the password secret with the windows shell.
This error was the root cause:
JAAS config entry not terminated by semi-colon

After two days of trying, i figured out that the password secret has to be base64 encoded in a linux environment.

Now the MirrorMaker works fine!

Thanks for your help and the great work on strimzi!

Best Regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants