Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rack Awareness doesn't work with Strimzi Connect #6492

Closed
tsaiboon opened this issue Mar 9, 2022 · 6 comments · Fixed by #6517
Closed

Rack Awareness doesn't work with Strimzi Connect #6492

tsaiboon opened this issue Mar 9, 2022 · 6 comments · Fixed by #6517
Labels

Comments

@tsaiboon
Copy link

tsaiboon commented Mar 9, 2022

Describe the bug
By following Strimzi Documentation, I enabled rack awareness in Connect but I notice my Sink Connector still consuming from leader according the consumer logs in Connect.

Example by refer cluster and connect logs below, consumer in Connect "odf-connect-cluster-connect-5db9446969-29mxp" which deployed on node label "ip-10-65-202-151", consumed topic "streams-output-sbsb" partition 2 from Kafka 0 which deployed on node label "ip-10-65-201-163" where the partition 2 leader located.

What I refer:
https://strimzi.io/docs/operators/0.25.0/using.html#proc-kafka-connect-config-str
https://strimzi.io/docs/operators/0.25.0/using.html#type-Rack-reference (The example for KafkaConnect looks wrong, we don't have 'kafka' configuration for KafkaConnect kind.)

To Reproduce
Steps to reproduce the behavior:

  1. Start Kafka with 3 replica & rack awareness enable. I was testing in multi node cluster, so I use node label "kubernetes.io/hostname"
  2. Create Kafka Topic with replica 3 and partition 6.
  3. Start KafkaConnect with 3 replica & rack awareness enable with logger level DEUBG for class "org.apache.kafka.clients.consumer.internals.Fetcher"
  4. Start My Custom Sink Connector to consume from the topic created earlier.

Expected behavior
Connector task consumers should consume the topic partition from Kafka where the same node KafkaConnect deployed.

Example by refer cluster below, consumer in Connect "odf-connect-cluster-connect-5db9446969-29mxp" which deployed on node label "ip-10-65-202-151", consumed topic "streams-output-sbsb" partition 2 from Kafka 0 which deployed on same node label "ip-10-65-202-151".

Environment:

  • Strimzi version: 0.25.0
  • Installation method: Helm chart
  • Kubernetes cluster: Kubernetes v1.21.5

YAML files and logs
Kafka Deployment

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
...
spec:
  entityOperator:
    ...
  kafka:
    config:
      inter.broker.protocol.version: "2.8"
      log.message.format.version: "2.8"
      offsets.topic.replication.factor: 3
      replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
      ...
    rack:
      topologyKey: kubernetes.io/hostname
    replicas: 3
    storage:
      ...
    template:
      ...
    version: 2.8.0
  kafkaExporter:
    ...
  zookeeper:
    ...

Connect Deployment

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
...
spec:
  bootstrapServers: odf-cluster-kafka-bootstrap:9092
  config:
    config.storage.replication.factor: 1
    config.storage.topic: connect-cluster-configs
    group.id: connect-cluster
    offset.storage.replication.factor: 1
    offset.storage.topic: connect-cluster-offsets
    status.storage.replication.factor: 1
    status.storage.topic: connect-cluster-status
    value.converter.schema.registry.url: http://odf-cluster-schema-registry:8081
  ...
  logging:
    loggers:
      log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher: DEBUG
    type: inline
  rack:
    topologyKey: kubernetes.io/hostname
  replicas: 3
  template:
    ...
  version: 2.8.0

Cluster Allocation

$ kubectl get pods -o wide
NAME                                           READY   STATUS    RESTARTS   AGE     IP            NODE               NOMINATED NODE   READINESS GATES
odf-cluster-entity-operator-d9bdd7648-z4qj6    3/3     Running   0          3h5m    10.42.2.88    ip-10-65-201-163   <none>           <none>
odf-cluster-kafka-0                            1/1     Running   0          3h7m    10.42.2.87    ip-10-65-201-163   <none>           <none>
odf-cluster-kafka-1                            1/1     Running   0          3h7m    10.42.1.216   ip-10-65-205-21    <none>           <none>
odf-cluster-kafka-2                            1/1     Running   0          3h7m    10.42.0.103   ip-10-65-202-151   <none>           <none>
odf-cluster-schema-registry-6d7db7646d-szgsr   2/2     Running   5          3h8m    10.42.1.214   ip-10-65-205-21    <none>           <none>
odf-cluster-zookeeper-0                        1/1     Running   0          3h8m    10.42.2.85    ip-10-65-201-163   <none>           <none>
odf-cluster-zookeeper-1                        1/1     Running   0          3h8m    10.42.2.84    ip-10-65-201-163   <none>           <none>
odf-cluster-zookeeper-2                        1/1     Running   0          3h8m    10.42.2.83    ip-10-65-201-163   <none>           <none>
odf-connect-cluster-connect-5db9446969-29mxp   1/1     Running   0          3m26s   10.42.0.109   ip-10-65-202-151   <none>           <none>
odf-connect-cluster-connect-5db9446969-l9c6h   1/1     Running   0          3m26s   10.42.2.92    ip-10-65-201-163   <none>           <none>
odf-connect-cluster-connect-5db9446969-pl465   1/1     Running   0          3m26s   10.42.1.220   ip-10-65-205-21    <none>           <none>
strimzi-cluster-operator-6d4f759d57-vkqjc      1/1     Running   0          3h10m   10.42.1.213   ip-10-65-205-21    <none>           <none>

Connect "odf-connect-cluster-connect-5db9446969-29mxp" logs

2022-03-09 07:52:45,219 DEBUG [Consumer clientId=connector-consumer-sftpfilestreamsink-1, groupId=connect-sftpfilestreamsink] Added READ_UNCOMMITTED fetch request for partition streams-output-sbsb-2 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[odf-cluster-kafka-0.odf-cluster-kafka-brokers.cgf.svc:9093 (id: 0 rack: ip-10-65-201-163)], epoch=0}} to node odf-cluster-kafka-0.odf-cluster-kafka-brokers.cgf.svc:9093 (id: 0 rack: ip-10-65-201-163) (org.apache.kafka.clients.consumer.internals.Fetcher) [task-thread-sftpfilestreamsink-1]
2022-03-09 07:53:04,237 DEBUG [Consumer clientId=connector-consumer-sftpfilestreamsink-1, groupId=connect-sftpfilestreamsink] Added READ_UNCOMMITTED fetch request for partition streams-output-sbsb-3 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[odf-cluster-kafka-2.odf-cluster-kafka-brokers.cgf.svc:9093 (id: 2 rack: ip-10-65-202-151)], epoch=0}} to node odf-cluster-kafka-2.odf-cluster-kafka-brokers.cgf.svc:9093 (id: 2 rack: ip-10-65-202-151) (org.apache.kafka.clients.consumer.internals.Fetcher) [task-thread-sftpfilestreamsink-1]

Additional context
I did try add "consumer.override.client.rack" with hardcoded value in my Sink Connector and it work as expected. No sure why rack.topologyKey in KafkaConnect not able to control connector task consumer to consume from the Kafka on same rack label.

@tsaiboon tsaiboon added the bug label Mar 9, 2022
@scholzj
Copy link
Member

scholzj commented Mar 9, 2022

You should check your Connect logs to see if the client rack configuration is correctly set in the clients.

@tsaiboon
Copy link
Author

tsaiboon commented Mar 10, 2022

hi @scholzj client.rack did shown up with expected value in Connect logs at the beginning.

$ kubectl logs odf-connect-cluster-connect-fc7744f96-dkg5x | head -n100
Preparing truststore
Preparing truststore is complete
Starting Kafka Connect with configuration:
# Bootstrap servers
bootstrap.servers=odf-cluster-kafka-bootstrap:9092
# REST Listeners
rest.port=8083
rest.advertised.host.name=X.X.X.X
rest.advertised.port=8083
# Plugins
plugin.path=/opt/kafka/plugins
# Provided configuration
offset.storage.topic=connect-cluster-offsets
value.converter=org.apache.kafka.connect.json.JsonConverter
config.storage.topic=connect-cluster-configs
key.converter=org.apache.kafka.connect.json.JsonConverter
group.id=connect-cluster
status.storage.topic=connect-cluster-status
config.storage.replication.factor=1
connector.client.config.override.policy=All
offset.storage.replication.factor=1
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
status.storage.replication.factor=1
value.converter.schema.registry.url=http://odf-cluster-schema-registry:8081


security.protocol=PLAINTEXT
producer.security.protocol=PLAINTEXT
consumer.security.protocol=PLAINTEXT
admin.security.protocol=PLAINTEXT




# Additional configuration
client.rack=ip-10-65-205-21

2022-03-10 05:54:38,842 INFO WorkerInfo values:
        jvm.args = -Xms128M, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/opt/kafka, -Dlog4j.configuration=file:/opt/kafka/custom-config/log4j.properties
        jvm.spec = Red Hat, Inc., OpenJDK 64-Bit Server VM, 11.0.12, 11.0.12+7-LTS

@scholzj
Copy link
Member

scholzj commented Mar 10, 2022

That looks good ... But in the actual Kafka logs, it is printing the configurations used for the different consumers it creates ... that would be more interesting to see if it is there.

@tsaiboon
Copy link
Author

I don't see any consumer configurations printed in Kafka logs. But I do see them in Connect logs.
fyi, logs below I changed the sink connector to use default file stream sink connector class, and behavious is the same. my sink connector is "myfilestreamsink" with 3 tasks.

From Connect logs below, I notice there is 4 ConsumerConfig printed. The first 3 consumer config I believe is internal consumers which have the client.rack with expected value. But last consumer config which is my sink connector configuration showing no client.rack configure.

2022-03-11 06:05:02,206 INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser) [DistributedHerder-connect-1-1]
...
2022-03-11 06:05:02,215 INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [odf-cluster-kafka-bootstrap:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-connect-cluster-1
        client.rack = ip-10-65-201-163
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-cluster
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig) [DistributedHerder-connect-1-1]
...
...
2022-03-11 06:05:02,430 INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [odf-cluster-kafka-bootstrap:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-connect-cluster-2
        client.rack = ip-10-65-201-163
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-cluster
        ...
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig) [DistributedHerder-connect-1-1]
...
...
2022-03-11 06:05:02,720 INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [odf-cluster-kafka-bootstrap:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-connect-cluster-3
        client.rack = ip-10-65-201-163
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-cluster
        ...
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig) [DistributedHerder-connect-1-1]
...
...
2022-03-11 06:05:03,598 INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect) [main]
2022-03-11 06:05:05,582 INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation Generation{generationId=5, memberId='connect-1-e9552aae-9c64-4c68-b872-34ad66e4f6a4', protocol='sessioned'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1]
2022-03-11 06:05:05,599 INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully synced group in generation Generation{generationId=5, memberId='connect-1-e9552aae-9c64-4c68-b872-34ad66e4f6a4', protocol='sessioned'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1]
...
2022-03-11 06:05:05,600 WARN [Worker clientId=connect-1, groupId=connect-cluster] Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2022-03-11 06:05:05,600 INFO [Worker clientId=connect-1, groupId=connect-cluster] Current config state offset -1 is behind group assignment 6, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2022-03-11 06:05:05,602 INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished reading to end of log and updated config snapshot, new config log offset: 6 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2022-03-11 06:05:05,602 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset 6 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1]
2022-03-11 06:05:05,603 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting task myfilestreamsink-2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [StartAndStopExecutor-connect-1-1]
2022-03-11 06:05:05,607 INFO Creating task myfilestreamsink-2 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-1]
2022-03-11 06:05:05,609 INFO ConnectorConfig values:
        config.action.reload = restart
        connector.class = org.apache.kafka.connect.file.FileStreamSinkConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = myfilestreamsink
        predicates = []
        tasks.max = 3
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig) [StartAndStopExecutor-connect-1-1]
...
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [StartAndStopExecutor-connect-1-1]
2022-03-11 06:05:05,617 INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [odf-cluster-kafka-bootstrap:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = connector-consumer-myfilestreamsink-2
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-myfilestreamsink
        ...
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig) [StartAndStopExecutor-connect-1-1]
...
...

@scholzj
Copy link
Member

scholzj commented Mar 11, 2022

Yeah, sorry. I meant Kafka Connect logs. But I'm afraid I have no clue why Kafka passes the configuration to 3 out of 4 consumers but not in the 4th one.

@scholzj scholzj removed the question label Mar 12, 2022
scholzj added a commit to scholzj/strimzi-kafka-operator that referenced this issue Mar 13, 2022
Signed-off-by: Jakub Scholz <www@scholzj.com>
@scholzj
Copy link
Member

scholzj commented Mar 13, 2022

Ok, you are right, it does not seem to work. I opened a PR which should fix it.

scholzj added a commit that referenced this issue Mar 14, 2022
Signed-off-by: Jakub Scholz <www@scholzj.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants