Skip to content

Latest commit

 

History

History
 
 

connect-cluster

Use docker-compose.yml for a valid multi-node Kafka Connect cluster.

Scenario 01: rest.advertised.host.name set to something completely wrong

Three connectors, each with a misconfigured (non-resolvable) CONNECT_REST_ADVERTISED_HOST_NAME

worker 2:

Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-8c9ca437-de73-4faa-9df8-adcc0feca3fd', leaderUrl='http://foobar2:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

worker 3:

Joined group at generation 2 and got assignment: Assignment{error=0, leader='connect-1-8c9ca437-de73-4faa-9df8-adcc0feca3fd', leaderUrl='http://foobar2:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

worker 1:

Joined group at generation 3 and got assignment: Assignment{error=0, leader='connect-1-8c9ca437-de73-4faa-9df8-adcc0feca3fd', leaderUrl='http://foobar2:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

No connectors defined yet

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
        jq '.'
{}

Config topic

$ kafkacat -b localhost:9092 -t _kafka-connect-groupA-configs -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-groupA-configs [0] at offset 0

Send the config to worker 1:

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \
    -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "kafka.topic": "item_details_01",
    "max.interval":250,
    "quickstart": "ratings",
    "tasks.max": 6
}'
{"error_code":500,"message":"IO Error trying to forward REST request: java.net.UnknownHostException: foobar2: Name or service not known"}⏎

Stop worker 2

worker 1:

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 4 and got assignment: Assignment{error=0, leader='connect-1-1bf43113-2bc6-4ebb-aa50-82827da0d1d3', leaderUrl='http://foobar3:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

worker 3:

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 4 and got assignment: Assignment{error=0, leader='connect-1-1bf43113-2bc6-4ebb-aa50-82827da0d1d3', leaderUrl='http://foobar3:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Send config to worker 1:

$ curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \
        -d '{
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "kafka.topic": "item_details_01",
        "max.interval":250,
        "quickstart": "ratings",
        "tasks.max": 6
    }'
{"error_code":500,"message":"IO Error trying to forward REST request: java.net.UnknownHostException: foobar3: Name or service not known"}⏎

Send config to worker 3:

$ curl -s -X PUT -H  "Content-Type:application/json" http://localhost:28083/connectors/source-datagen-01/config \
        -d '{
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "kafka.topic": "item_details_01",
        "max.interval":250,
        "quickstart": "ratings",
        "tasks.max": 6
    }'

Successfully created. Configs topic updates:

$ kafkacat -b localhost:9092 -t _kafka-connect-groupA-configs -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-groupA-configs [0] at offset 0
Topic _kafka-connect-groupA-configs[0], offset: 0, Headers: , key: connector-source-datagen-01, payload: 274 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","kafka.topic":"item_details_01","max.interval":"250","quickstart":"ratings","tasks.max":"6","name":"source-datagen-01"}}
% Reached end of topic _kafka-connect-groupA-configs [0] at offset 1
Topic _kafka-connect-groupA-configs[0], offset: 1, Headers: , key: task-source-datagen-01-0, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
Topic _kafka-connect-groupA-configs[0], offset: 2, Headers: , key: task-source-datagen-01-1, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
Topic _kafka-connect-groupA-configs[0], offset: 3, Headers: , key: task-source-datagen-01-2, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
Topic _kafka-connect-groupA-configs[0], offset: 4, Headers: , key: task-source-datagen-01-3, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
Topic _kafka-connect-groupA-configs[0], offset: 5, Headers: , key: task-source-datagen-01-4, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
Topic _kafka-connect-groupA-configs[0], offset: 6, Headers: , key: task-source-datagen-01-5, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}}
Topic _kafka-connect-groupA-configs[0], offset: 7, Headers: , key: commit-source-datagen-01, payload: 11 bytes: {"tasks":6}
% Reached end of topic _kafka-connect-groupA-configs [0] at offset 8

Status topic updates:

$ kafkacat -b localhost:9092 -t _kafka-connect-groupA-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-groupA-status [0] at offset 0
% Reached end of topic _kafka-connect-groupA-status [1] at offset 0
% Reached end of topic _kafka-connect-groupA-status [2] at offset 0
% Reached end of topic _kafka-connect-groupA-status [3] at offset 0
% Reached end of topic _kafka-connect-groupA-status [4] at offset 0
Topic _kafka-connect-groupA-status[3], offset: 0, Headers: , key: status-connector-source-datagen-01, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":5}
% Reached end of topic _kafka-connect-groupA-status [3] at offset 1
Topic _kafka-connect-groupA-status[0], offset: 0, Headers: , key: status-task-source-datagen-01-4, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":6}
Topic _kafka-connect-groupA-status[1], offset: 0, Headers: , key: status-task-source-datagen-01-2, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":6}
% Reached end of topic _kafka-connect-groupA-status [0] at offset 1
Topic _kafka-connect-groupA-status[0], offset: 1, Headers: , key: status-task-source-datagen-01-0, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":6}
% Reached end of topic _kafka-connect-groupA-status [1] at offset 1
Topic _kafka-connect-groupA-status[4], offset: 0, Headers: , key: status-task-source-datagen-01-5, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":6}
% Reached end of topic _kafka-connect-groupA-status [0] at offset 2
% Reached end of topic _kafka-connect-groupA-status [4] at offset 1
Topic _kafka-connect-groupA-status[0], offset: 2, Headers: , key: status-task-source-datagen-01-3, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":6}
Topic _kafka-connect-groupA-status[1], offset: 1, Headers: , key: status-task-source-datagen-01-1, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":6}
% Reached end of topic _kafka-connect-groupA-status [0] at offset 3
% Reached end of topic _kafka-connect-groupA-status [1] at offset 2

Three tasks started on each worker

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
        jq '.'
{
"source-datagen-01": {
    "info": {
    "name": "source-datagen-01",
    "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "quickstart": "ratings",
        "tasks.max": "6",
        "name": "source-datagen-01",
        "kafka.topic": "item_details_01",
        "max.interval": "250",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    },
    "tasks": [
        {
        "connector": "source-datagen-01",
        "task": 0
        },
        {
        "connector": "source-datagen-01",
        "task": 1
        },
        {
        "connector": "source-datagen-01",
        "task": 2
        },
        {
        "connector": "source-datagen-01",
        "task": 3
        },
        {
        "connector": "source-datagen-01",
        "task": 4
        },
        {
        "connector": "source-datagen-01",
        "task": 5
        }
    ],
    "type": "source"
    },
    "status": {
    "name": "source-datagen-01",
    "connector": {
        "state": "RUNNING",
        "worker_id": "foobar3:8083"
    },
    "tasks": [
        {
        "id": 0,
        "state": "RUNNING",
        "worker_id": "foobar3:8083"
        },
        {
        "id": 1,
        "state": "RUNNING",
        "worker_id": "foobar1:8083"
        },
        {
        "id": 2,
        "state": "RUNNING",
        "worker_id": "foobar3:8083"
        },
        {
        "id": 3,
        "state": "RUNNING",
        "worker_id": "foobar1:8083"
        },
        {
        "id": 4,
        "state": "RUNNING",
        "worker_id": "foobar3:8083"
        },
        {
        "id": 5,
        "state": "RUNNING",
        "worker_id": "foobar1:8083"
        }
    ],
    "type": "source"
    }
}
}

Stop worker3

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 7 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 7 and got assignment: Assignment{error=0, leader='connect-1-d16e56f8-7107-4d23-9f10-f5c70ab089ad', leaderUrl='http://foobar1:8083/', offset=8, connectorIds=[source-datagen-01], taskIds=[source-datagen-01-0, source-datagen-01-2, source-datagen-01-4, source-datagen-01-1, source-datagen-01-3, source-datagen-01-5], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Tasks reassigned to worker 1

Status topic:

Topic _kafka-connect-groupA-status[3], offset: 1, Headers: , key: status-connector-source-datagen-01, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7}
% Reached end of topic _kafka-connect-groupA-status [3] at offset 2
Topic _kafka-connect-groupA-status[0], offset: 3, Headers: , key: status-task-source-datagen-01-4, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7}
Topic _kafka-connect-groupA-status[0], offset: 4, Headers: , key: status-task-source-datagen-01-0, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7}
% Reached end of topic _kafka-connect-groupA-status [0] at offset 5
Topic _kafka-connect-groupA-status[1], offset: 2, Headers: , key: status-task-source-datagen-01-2, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7}

Connector status

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
        jq '.'
 […]
"status": {
  "name": "source-datagen-01",
  "connector": {
    "state": "RUNNING",
    "worker_id": "foobar1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "foobar1:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "foobar1:8083"
    },
    {
      "id": 2,
      "state": "RUNNING",
      "worker_id": "foobar1:8083"
    },
    {
      "id": 3,
      "state": "RUNNING",
      "worker_id": "foobar1:8083"
    },
    {
      "id": 4,
      "state": "RUNNING",
      "worker_id": "foobar1:8083"
    },
    {
      "id": 5,
      "state": "RUNNING",
      "worker_id": "foobar1:8083"
    }
  ],
  "type": "source"
 […]

tl;dr If the rest.advertised.host.name cannot be resolved by the other workers then you will have problems making config changes. It doesn’t seem to impact the execution of connectors though, since this is coordinated through the Kafka topic.

Scenario 02: rest.advertised.host.name set to localhost

localhost in the context of a worker will be the local machine; in the case of Docker this means each individual container, nothing to do with the host machine on which it runs

Bounced the workers a few times to pick up correct config. Note new Kafka topics from previous scenario.

Worker 1

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 16 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 16 and got assignment: Assignment{error=0, leader='connect-1-410154a3-a7cc-4c60-adff-79ea9938b431', leaderUrl='http://localhost:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker 2

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 16 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 16 and got assignment: Assignment{error=0, leader='connect-1-410154a3-a7cc-4c60-adff-79ea9938b431', leaderUrl='http://localhost:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker 3

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 16 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 16 and got assignment: Assignment{error=0, leader='connect-1-410154a3-a7cc-4c60-adff-79ea9938b431', leaderUrl='http://localhost:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Note that each one has the leaderUrl of localhost:8083.

No consumer groups

$ docker exec -it kafka kafka-consumer-groups \
                    --bootstrap-server kafka:29092 \
                    --list
[~/g/d/connect-cluster] rmoff@asgard03.moffatt.me  (connect-cluster-nov19|…)
$

Nothing in any of the KC topics.

$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-configs -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-groupB-configs [0] at offset 0
$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-offsets -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-groupB-offsets [0] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [1] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [2] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [3] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [4] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [5] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [6] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [7] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [8] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [9] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [10] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [11] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [12] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [13] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [14] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [15] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [16] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [17] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [18] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [19] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [20] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [21] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [22] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [23] at offset 0
% Reached end of topic _kafka-connect-groupB-offsets [24] at offset 0
$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-groupB-status [0] at offset 0
% Reached end of topic _kafka-connect-groupB-status [1] at offset 0
% Reached end of topic _kafka-connect-groupB-status [2] at offset 0
% Reached end of topic _kafka-connect-groupB-status [3] at offset 0
% Reached end of topic _kafka-connect-groupB-status [4] at offset 0

Consumer offsets topic

$ kafkacat -b localhost:9092 -t __consumer_offsets -o beginning -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n' -u -C

Most recent message

Topic __consumer_offsets / Partition 33 / Offset: 15 / Timestamp: 1574417598705
Headers:
Key (24 bytes): kafka-connect-groupB
Payload (855 bytes): connect
compatible.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431n��.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431��   connect-1/172.28.0.5�`'�http://localhost:8083/��������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637��        connect-1/172.28.0.7�`'&http://localhost:8083/������������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������.connect-1-9a6ad1d8-cd5e-4a04-9dc6-fad03fb005af�� connect-1/172.28.0.6�`'&http://localhost:8083/������������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������

Docker network IP of each worker

$ docker exec kafka-connect-01 bash -c "ip addr show eth0"
201: eth0@if202: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
    link/ether 02:42:ac:1c:00:07 brd ff:ff:ff:ff:ff:ff
    inet 172.28.0.7/16 brd 172.28.255.255 scope global eth0
    valid_lft forever preferred_lft forever
[~/g/d/connect-cluster] rmoff@asgard03.moffatt.me  (connect-cluster-nov19|…)
$ docker exec kafka-connect-02 bash -c "ip addr show eth0"
199: eth0@if200: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
    link/ether 02:42:ac:1c:00:06 brd ff:ff:ff:ff:ff:ff
    inet 172.28.0.6/16 brd 172.28.255.255 scope global eth0
    valid_lft forever preferred_lft forever
[~/g/d/connect-cluster] rmoff@asgard03.moffatt.me  (connect-cluster-nov19|…)
$ docker exec kafka-connect-03 bash -c "ip addr show eth0"
197: eth0@if198: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
    link/ether 02:42:ac:1c:00:05 brd ff:ff:ff:ff:ff:ff
    inet 172.28.0.5/16 brd 172.28.255.255 scope global eth0
    valid_lft forever preferred_lft forever
[~/g/d/connect-cluster] rmoff@asgard03.moffatt.me  (connect-cluster-nov19|…)

Send the config to worker 1:

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \
    -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "kafka.topic": "item_details_01",
    "max.interval":250,
    "quickstart": "ratings",
    "tasks.max": 6
}'
{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}⏎

Worker 1 logs:

kafka-connect-01    | [2019-11-22 10:43:54,923] INFO AbstractConfig values:
kafka-connect-01    |  (org.apache.kafka.common.config.AbstractConfig)
kafka-connect-01    | [2019-11-22 10:43:54,936] INFO AbstractConfig values:
kafka-connect-01    |  (org.apache.kafka.common.config.AbstractConfig)
kafka-connect-01    | [2019-11-22 10:43:54,948] INFO AbstractConfig values:
kafka-connect-01    |  (org.apache.kafka.common.config.AbstractConfig)

Send it to Worker 2

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:18083/connectors/source-datagen-01/config \
    -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "kafka.topic": "item_details_01",
    "max.interval":250,
    "quickstart": "ratings",
    "tasks.max": 6
}'
{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}⏎

Worker 2 logs:

kafka-connect-02    | [2019-11-22 10:44:34,443] INFO AbstractConfig values:
kafka-connect-02    |  (org.apache.kafka.common.config.AbstractConfig)
kafka-connect-02    | [2019-11-22 10:44:34,566] INFO AbstractConfig values:
kafka-connect-02    |  (org.apache.kafka.common.config.AbstractConfig)
kafka-connect-02    | [2019-11-22 10:44:34,579] INFO AbstractConfig values:
kafka-connect-02    |  (org.apache.kafka.common.config.AbstractConfig)

Send it to Worker 3

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:28083/connectors/source-datagen-01/config \
    -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "kafka.topic": "item_details_01",
    "max.interval":250,
    "quickstart": "ratings",
    "tasks.max": 6
}'

Connector gets created and executed across all three workers:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
        jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
        column -s : -t| sed 's/\"//g'| sort
source  |  source-datagen-01  |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector

But in status topic and in REST API workers are only identified by their advertised host, which means you can’t track the task:

$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-groupB-status [0] at offset 0
% Reached end of topic _kafka-connect-groupB-status [1] at offset 0
% Reached end of topic _kafka-connect-groupB-status [2] at offset 0
% Reached end of topic _kafka-connect-groupB-status [3] at offset 0
% Reached end of topic _kafka-connect-groupB-status [4] at offset 0
Topic _kafka-connect-groupB-status[3], offset: 0, Headers: , key: status-connector-source-datagen-01, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":17}
% Reached end of topic _kafka-connect-groupB-status [3] at offset 1
Topic _kafka-connect-groupB-status[0], offset: 0, Headers: , key: status-task-source-datagen-01-3, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18}
Topic _kafka-connect-groupB-status[0], offset: 1, Headers: , key: status-task-source-datagen-01-0, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18}
Topic _kafka-connect-groupB-status[0], offset: 2, Headers: , key: status-task-source-datagen-01-4, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18}
% Reached end of topic _kafka-connect-groupB-status [0] at offset 3
Topic _kafka-connect-groupB-status[1], offset: 0, Headers: , key: status-task-source-datagen-01-1, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18}
Topic _kafka-connect-groupB-status[1], offset: 1, Headers: , key: status-task-source-datagen-01-2, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18}
% Reached end of topic _kafka-connect-groupB-status [1] at offset 2
Topic _kafka-connect-groupB-status[4], offset: 0, Headers: , key: status-task-source-datagen-01-5, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18}
% Reached end of topic _kafka-connect-groupB-status [4] at offset 1

Checking in each worker log shows that the tasks are running on each.

Offsets topic:

% Reached end of topic __consumer_offsets [33] at offset 17
Topic __consumer_offsets / Partition 33 / Offset: 17 / Timestamp: 1574419513301
Headers:
Key (24 bytes): kafka-connect-groupB
Payload (1167 bytes): connect
compatible.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431n��?�.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431�� connect-1/172.28.0.5�`'�http://localhost:8083{.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/source-datagen-01����.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637��     connect-1/172.28.0.7�`'�http://localhost:8083`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01.connect-1-9a6ad1d8-cd5e-4a04-9dc6-fad03fb005af��  connect-1/172.28.0.6�`'�http://localhost:8083`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01
--

Kill worker 3

% Reached end of topic __consumer_offsets [33] at offset 18
Topic __consumer_offsets / Partition 33 / Offset: 18 / Timestamp: 1574419726554
Headers:
Key (24 bytes): kafka-connect-groupB
Payload (847 bytes): connect
compatible.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637n����.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637�� connect-1/172.28.0.7�`'�http://localhost:8083.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083source-datagen-01����.connect-1-9a6ad1d8-cd5e-4a04-9dc6-fad03fb005af��       connect-1/172.28.0.6�`'�http://localhost:8083.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01{.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083source-datagen-01
--

Kill worker 2

% Reached end of topic __consumer_offsets [33] at offset 19
Topic __consumer_offsets / Partition 33 / Offset: 19 / Timestamp: 1574419816661
Headers:
Key (24 bytes): kafka-connect-groupB
Payload (446 bytes): connect
compatible.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637n����.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637�� connect-1/172.28.0.7�`'�http://localhost:8083�.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083source-datagen-01����`.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083��
--
% Reached end of topic __consumer_offsets [33] at offset 20

Tasks are reassigned each time. Offsets topic tracks the alive worker

tl;dr If the rest.advertised.host.name is set to localhost then if you send REST requests to a worker that is not the leader they will fail. Kafka Connect forwards these requests from non-leader workers to the leader worker, and if the advertised host name is localhost then it is forwarding it to itself, which won’t work. It doesn’t seem to impact the execution of connectors though, since this is coordinated through the Kafka topic.

Scenario 3: Shared config topic but different group.id

One worker in groupX, two workers in groupY. All three using the same config/offset/status topics.

Offsets topic

$ kafkacat -b localhost:9092 -t __consumer_offsets -o beginning -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n' -u -C
Topic __consumer_offsets / Partition 10 / Offset: 0 / Timestamp: 1574421190777
Headers:
Key (24 bytes): kafka-connect-groupY
Payload (563 bytes): connect
compatible.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743n���\.connect-1-edc46031-885a-4347-ad29-4da7c07f7fb9�� connect-1/172.29.0.7�`'-http://kafka-connect-02:8083/������������g.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743http://kafka-connect-03:8083/��������.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743��    connect-1/172.29.0.6�`'-http://kafka-connect-03:8083/������������g.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743http://kafka-connect-03:8083/��������
--
Topic __consumer_offsets / Partition 11 / Offset: 0 / Timestamp: 1574421190665
Headers:
Key (24 bytes): kafka-connect-groupX
Payload (325 bytes): connect
compatible.connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66n����.connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66�� connect-1/172.29.0.5�`'-http://kafka-connect-01:8083/������������g.connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66http://kafka-connect-01:8083/��������

Worker 1 log

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66', leaderUrl='http://kafka-connect-01:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker 2 log

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743', leaderUrl='http://kafka-connect-03:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker 3 log

Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743', leaderUrl='http://kafka-connect-03:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Send config to worker 1 (groupX)

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \
        -d '{
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "kafka.topic": "item_details_01",
        "max.interval":250,
        "quickstart": "ratings",
        "tasks.max": 6
    }'

The logs show that the tasks are started and still running across all three workers:

Worker 1:

[Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-3 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-4 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-5 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
…
WorkerSourceTask{id=source-datagen-01-5} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-3} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-4} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-2} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)

Worker 2:

[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-3 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-5 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
…
WorkerSourceTask{id=source-datagen-01-3} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-5} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)

Worker 3:

[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-4 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
…
WorkerSourceTask{id=source-datagen-01-2} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-4} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
WorkerSourceTask{id=source-datagen-01-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)

Status topic

$ kafkacat -b localhost:9092 -t _kafka-connect-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C
% Reached end of topic _kafka-connect-status [0] at offset 0
% Reached end of topic _kafka-connect-status [1] at offset 0
% Reached end of topic _kafka-connect-status [2] at offset 0
% Reached end of topic _kafka-connect-status [3] at offset 0
% Reached end of topic _kafka-connect-status [4] at offset 0
Topic _kafka-connect-status[3], offset: 0, Headers: , key: status-connector-source-datagen-01, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":2}
% Reached end of topic _kafka-connect-status [3] at offset 1
Topic _kafka-connect-status[3], offset: 1, Headers: , key: status-connector-source-datagen-01, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":2}
% Reached end of topic _kafka-connect-status [3] at offset 2
Topic _kafka-connect-status[0], offset: 0, Headers: , key: status-task-source-datagen-01-0, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":3}
Topic _kafka-connect-status[0], offset: 1, Headers: , key: status-task-source-datagen-01-4, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":3}
% Reached end of topic _kafka-connect-status [0] at offset 2
Topic _kafka-connect-status[1], offset: 0, Headers: , key: status-task-source-datagen-01-2, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":3}
% Reached end of topic _kafka-connect-status [1] at offset 1
Topic _kafka-connect-status[4], offset: 0, Headers: , key: status-task-source-datagen-01-5, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":3}
% Reached end of topic _kafka-connect-status [4] at offset 1
Topic _kafka-connect-status[1], offset: 1, Headers: , key: status-task-source-datagen-01-1, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":3}
Topic _kafka-connect-status[0], offset: 2, Headers: , key: status-task-source-datagen-01-3, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":3}
% Reached end of topic _kafka-connect-status [1] at offset 2
% Reached end of topic _kafka-connect-status [0] at offset 3
Topic _kafka-connect-status[1], offset: 2, Headers: , key: status-task-source-datagen-01-2, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-03:8083","generation":3}
Topic _kafka-connect-status[0], offset: 3, Headers: , key: status-task-source-datagen-01-4, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-03:8083","generation":3}
% Reached end of topic _kafka-connect-status [1] at offset 3
Topic _kafka-connect-status[0], offset: 4, Headers: , key: status-task-source-datagen-01-0, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-03:8083","generation":3}
% Reached end of topic _kafka-connect-status [0] at offset 5
Topic _kafka-connect-status[4], offset: 1, Headers: , key: status-task-source-datagen-01-5, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-02:8083","generation":3}
Topic _kafka-connect-status[1], offset: 3, Headers: , key: status-task-source-datagen-01-1, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-02:8083","generation":3}
% Reached end of topic _kafka-connect-status [4] at offset 2
Topic _kafka-connect-status[0], offset: 5, Headers: , key: status-task-source-datagen-01-3, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-02:8083","generation":3}
% Reached end of topic _kafka-connect-status [1] at offset 4
% Reached end of topic _kafka-connect-status [0] at offset 6
Topic _kafka-connect-status[0], offset: 6, Headers: , key: status-task-source-datagen-01-3, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":4}
% Reached end of topic _kafka-connect-status [0] at offset 7
Topic _kafka-connect-status[1], offset: 4, Headers: , key: status-task-source-datagen-01-2, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":4}
% Reached end of topic _kafka-connect-status [1] at offset 5
Topic _kafka-connect-status[0], offset: 7, Headers: , key: status-task-source-datagen-01-4, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":4}
Topic _kafka-connect-status[0], offset: 8, Headers: , key: status-task-source-datagen-01-0, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":4}
% Reached end of topic _kafka-connect-status [0] at offset 9
Topic _kafka-connect-status[1], offset: 5, Headers: , key: status-task-source-datagen-01-1, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":4}
Topic _kafka-connect-status[4], offset: 2, Headers: , key: status-task-source-datagen-01-5, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":4}
% Reached end of topic _kafka-connect-status [1] at offset 6
% Reached end of topic _kafka-connect-status [4] at offset 3
Topic _kafka-connect-status[4], offset: 3, Headers: , key: status-task-source-datagen-01-5, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3}
% Reached end of topic _kafka-connect-status [4] at offset 4
Topic _kafka-connect-status[0], offset: 9, Headers: , key: status-task-source-datagen-01-3, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3}
% Reached end of topic _kafka-connect-status [0] at offset 10
Topic _kafka-connect-status[1], offset: 6, Headers: , key: status-task-source-datagen-01-2, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3}
Topic _kafka-connect-status[1], offset: 7, Headers: , key: status-task-source-datagen-01-1, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3}
Topic _kafka-connect-status[0], offset: 10, Headers: , key: status-task-source-datagen-01-4, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3}
Topic _kafka-connect-status[0], offset: 11, Headers: , key: status-task-source-datagen-01-0, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3}
% Reached end of topic _kafka-connect-status [1] at offset 8
% Reached end of topic _kafka-connect-status [0] at offset 12

Status from worker 1 REST API

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
            jq '."source-datagen-01".status.tasks'
[
{
    "id": 0,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 1,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 2,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 3,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 4,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 5,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
}
]

Status from worker 2 REST API

$ curl -s "http://localhost:18083/connectors?expand=info&expand=status" | \
            jq '."source-datagen-01".status.tasks'
[
{
    "id": 0,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 1,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 2,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 3,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 4,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 5,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
}
]

Status from worker 3 REST API

$ curl -s "http://localhost:28083/connectors?expand=info&expand=status" | \
            jq '."source-datagen-01".status.tasks'
[
{
    "id": 0,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 1,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 2,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 3,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 4,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
},
{
    "id": 5,
    "state": "RUNNING",
    "worker_id": "kafka-connect-01:8083"
}
]

i.e. the tasks are reported as running only on worker 1. At a guess because the status topic is compacted/read as a table and since it is keyed on the task id only the latest state (which happens to be Worker 1) is read.