From 4a16101a8203a9a56c4a85768809df925e848e65 Mon Sep 17 00:00:00 2001 From: robooo Date: Sun, 21 Sep 2025 19:53:53 +0200 Subject: [PATCH] Update Docker Compose and AdminClient to support new Kafka version and consumer group types --- examples/docker-compose.yml | 58 +++++++++++------------ examples/test_adminclient.robot | 4 +- src/ConfluentKafkaLibrary/admin_client.py | 9 ++-- src/ConfluentKafkaLibrary/version.py | 2 +- 4 files changed, 38 insertions(+), 35 deletions(-) diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index 478d594..3520698 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -1,62 +1,62 @@ -version: '3.6' +version: '3.8' services: - zookeeper: - image: confluentinc/cp-zookeeper:7.3.3 - hostname: zookeeper - container_name: zookeeper - ports: - - '2181:2181' - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - healthcheck: - test: ['CMD-SHELL', 'nc -zv localhost 2181 && exit 0 || exit 1'] - broker: - image: confluentinc/cp-enterprise-kafka:7.3.3 + image: confluentinc/cp-server:7.8.0 hostname: broker container_name: broker - depends_on: - - zookeeper ports: - '9092:9092' - '29092:29092' healthcheck: - test: ['CMD-SHELL', 'nc -zv localhost 9092 && exit 0 || exit 1'] + test: ["CMD-SHELL", "nc -z localhost 9092"] + interval: 10s + timeout: 5s + retries: 5 environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_NODE_ID: 1 + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:9093' + KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:9093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 schema-registry: - image: confluentinc/cp-schema-registry:7.3.3 + image: confluentinc/cp-schema-registry:7.8.0 hostname: schema-registry container_name: schema-registry depends_on: - - zookeeper - - broker + broker: + condition: service_healthy ports: - '8081:8081' healthcheck: - test: ['CMD-SHELL', 'nc -zv localhost 8081 && exit 0 || exit 1'] + test: ["CMD-SHELL", "nc -z localhost 8081"] + interval: 10s + timeout: 5s + retries: 5 environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081' - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' rest-proxy: - image: confluentinc/cp-kafka-rest:7.3.3 + image: confluentinc/cp-kafka-rest:7.8.0 depends_on: - - zookeeper - - broker - - schema-registry + broker: + condition: service_healthy + schema-registry: + condition: service_healthy ports: - 8082:8082 healthcheck: - test: ['CMD-SHELL', 'nc -zv localhost 8082 && exit 0 || exit 1'] + test: ["CMD-SHELL", "nc -z localhost 8082"] + interval: 10s + timeout: 5s + retries: 5 hostname: rest-proxy container_name: rest-proxy environment: diff --git a/examples/test_adminclient.robot b/examples/test_adminclient.robot index 52c2d0a..25f986b 100644 --- a/examples/test_adminclient.robot +++ b/examples/test_adminclient.robot @@ -32,7 +32,8 @@ AdminClient List Consumer Groups ${admin_client_id}= Create Admin Client ${states}= Create List ${CONSUMER_GROUP_STATE_STABLE} - ${groups}= List Groups ${admin_client_id} states=${states} + ${types}= Create List ${CONSUMER_GROUP_TYPE_CLASSIC} + ${groups}= List Groups ${admin_client_id} states=${states} types=${types} Log ${groups} Log ${groups.valid} FOR ${group} IN @{groups.valid} @@ -40,6 +41,7 @@ AdminClient List Consumer Groups IF "${group_id}" == "${group.group_id}" Log ${group.group_id} Log ${group.state} + Log ${group.type} Pass Execution "Consumer found in list" END END diff --git a/src/ConfluentKafkaLibrary/admin_client.py b/src/ConfluentKafkaLibrary/admin_client.py index 290c301..dc5a5dd 100644 --- a/src/ConfluentKafkaLibrary/admin_client.py +++ b/src/ConfluentKafkaLibrary/admin_client.py @@ -25,7 +25,7 @@ def create_admin_client( self.admin_clients[group_id] = admin_client return group_id - def list_groups(self, group_id, states=None, request_timeout=10): + def list_groups(self, group_id, states=None, types=None, request_timeout=10): """List consumer groups. - ``states`` (list(ConsumerGroupState)): filter consumer groups which are currently in these states. For example usage see 'AdminClient List Consumer Groups' at @@ -34,9 +34,10 @@ def list_groups(self, group_id, states=None, request_timeout=10): - ``request_timeout`` (int): Maximum response time before timing out. Default: `10`. """ - if states is None: - states = [] - future = self.admin_clients[group_id].list_consumer_groups(request_timeout=request_timeout, states=set(states)) + states = states or [] + types = types or [] + + future = self.admin_clients[group_id].list_consumer_groups(request_timeout=request_timeout, states=set(states), types=set(types)) return future.result() def describe_groups(self, group_id, group_ids, request_timeout=10, **kwargs): diff --git a/src/ConfluentKafkaLibrary/version.py b/src/ConfluentKafkaLibrary/version.py index 9792a61..8267819 100644 --- a/src/ConfluentKafkaLibrary/version.py +++ b/src/ConfluentKafkaLibrary/version.py @@ -1 +1 @@ -VERSION = '2.10.0.post1' +VERSION = '2.10.0.post2'