Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,62 +1,62 @@
version: '3.6'
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.3
hostname: zookeeper
container_name: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
healthcheck:
test: ['CMD-SHELL', 'nc -zv localhost 2181 && exit 0 || exit 1']

broker:
image: confluentinc/cp-enterprise-kafka:7.3.3
image: confluentinc/cp-server:7.8.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- '9092:9092'
- '29092:29092'
healthcheck:
test: ['CMD-SHELL', 'nc -zv localhost 9092 && exit 0 || exit 1']
test: ["CMD-SHELL", "nc -z localhost 9092"]
interval: 10s
timeout: 5s
retries: 5
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_NODE_ID: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:9093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:9093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

schema-registry:
image: confluentinc/cp-schema-registry:7.3.3
image: confluentinc/cp-schema-registry:7.8.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
broker:
condition: service_healthy
ports:
- '8081:8081'
healthcheck:
test: ['CMD-SHELL', 'nc -zv localhost 8081 && exit 0 || exit 1']
test: ["CMD-SHELL", "nc -z localhost 8081"]
interval: 10s
timeout: 5s
retries: 5
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.3
image: confluentinc/cp-kafka-rest:7.8.0
depends_on:
- zookeeper
- broker
- schema-registry
broker:
condition: service_healthy
schema-registry:
condition: service_healthy
ports:
- 8082:8082
healthcheck:
test: ['CMD-SHELL', 'nc -zv localhost 8082 && exit 0 || exit 1']
test: ["CMD-SHELL", "nc -z localhost 8082"]
interval: 10s
timeout: 5s
retries: 5
hostname: rest-proxy
container_name: rest-proxy
environment:
Expand Down
4 changes: 3 additions & 1 deletion examples/test_adminclient.robot
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ AdminClient List Consumer Groups

${admin_client_id}= Create Admin Client
${states}= Create List ${CONSUMER_GROUP_STATE_STABLE}
${groups}= List Groups ${admin_client_id} states=${states}
${types}= Create List ${CONSUMER_GROUP_TYPE_CLASSIC}
${groups}= List Groups ${admin_client_id} states=${states} types=${types}
Log ${groups}
Log ${groups.valid}
FOR ${group} IN @{groups.valid}
Log ${group.group_id}
IF "${group_id}" == "${group.group_id}"
Log ${group.group_id}
Log ${group.state}
Log ${group.type}
Pass Execution "Consumer found in list"
END
END
Expand Down
9 changes: 5 additions & 4 deletions src/ConfluentKafkaLibrary/admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def create_admin_client(
self.admin_clients[group_id] = admin_client
return group_id

def list_groups(self, group_id, states=None, request_timeout=10):
def list_groups(self, group_id, states=None, types=None, request_timeout=10):
"""List consumer groups.
- ``states`` (list(ConsumerGroupState)): filter consumer groups which are currently in these states.
For example usage see 'AdminClient List Consumer Groups' at
Expand All @@ -34,9 +34,10 @@ def list_groups(self, group_id, states=None, request_timeout=10):
- ``request_timeout`` (int): Maximum response time before timing out.
Default: `10`.
"""
if states is None:
states = []
future = self.admin_clients[group_id].list_consumer_groups(request_timeout=request_timeout, states=set(states))
states = states or []
types = types or []

future = self.admin_clients[group_id].list_consumer_groups(request_timeout=request_timeout, states=set(states), types=set(types))
return future.result()

def describe_groups(self, group_id, group_ids, request_timeout=10, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion src/ConfluentKafkaLibrary/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '2.10.0.post1'
VERSION = '2.10.0.post2'