Skip to content

Commit

Permalink
GH-3145 : Sample for next gen consumer-group rebalance protocol
Browse files Browse the repository at this point in the history
Fixes: #3145 

* Add a sample demonstrating the next-generation consumer-group rebalance protocol.
* Provide a docker-compose script for spinning up a `kraft` based Kafka broker since the new consumer 
  protocol only works in `kraft` mode. 
* Add README explaining the sample.
  • Loading branch information
chickenchickenlove committed May 20, 2024
1 parent bd8b82e commit 00fd11b
Show file tree
Hide file tree
Showing 13 changed files with 613 additions and 0 deletions.
1 change: 1 addition & 0 deletions samples/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
* sample-04 - Topic based (non-blocking) retry
* sample-05 - Global embedded Kafka testing
* sample-06 - Kafka Streams tests with TopologyTestDriver
* sample-07 - The New consumer rebalance protocol in spring-kafka
37 changes: 37 additions & 0 deletions samples/sample-07/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

### VS Code ###
.vscode/
50 changes: 50 additions & 0 deletions samples/sample-07/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
== Sample 7

This sample demonstrates the application of the new consumer rebalance protocol in Spring for Apache Kafka.

The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848].

`Spring Boot` starts the `Kafka Broker` container defined in the `compose.yaml` file upon startup.

```yaml
version: '3'
services:
broker:
image: bitnami/kafka:3.7.0
...
# KIP-848
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
```

The config of `group.protocol = conumser` should be added to `Consumer` configuration to apply new consumer rebalance protocol.

The `group.protocol` can be configured in the `resources/application.yaml` as follows:

```yaml
spring:
kafka:
consumer:
properties:
group.protocol: consumer
```

Next, the `Consumer` created by `@KafkaListener` will request a subscription to the `test-topic` from the `Broker`.

The `Broker` will then send the Topic Partition Assign information to the `Consumer`. This means that the `Consumer` rebalancing has finished, and the `Consumer` has started to poll messages.

```java
@Component
public class Sample07KafkaListener {

@KafkaListener(topics = "test-topic", groupId = "sample07-1")
public void listenWithGroup1(String message) {
System.out.println("Received message at group sample07-1: " + message);
}

@KafkaListener(topics = "test-topic", groupId = "sample07-2")
public void listenWithGroup2(String message) {
System.out.println("Received message at group sample07-2: " + message);
}
}
```
32 changes: 32 additions & 0 deletions samples/sample-07/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.0-SNAPSHOT'
id 'io.spring.dependency-management' version '1.1.5'
}

group = 'com.example'
version = '3.2.0-SNAPSHOT'

java {
sourceCompatibility = '17'
}

repositories {
mavenCentral()
maven { url 'https://repo.spring.io/milestone' }
maven { url 'https://repo.spring.io/snapshot' }
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.springframework.boot:spring-boot-testcontainers'
}

tasks.named('test') {
useJUnitPlatform()
}
34 changes: 34 additions & 0 deletions samples/sample-07/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: '3'
services:
broker:
image: bitnami/kafka:3.7.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "10000:9094"
environment:
# Kraft Settings
KAFKA_CFG_NODE_ID: 0
KAFKA_KRAFT_CLUSTER_ID: HsDBs9l6UUmQq7Y5E6bNlw
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_PROCESS_ROLES: controller,broker

# Listeners
KAFKA_CFG_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://0.0.0.0:9092, EXTERNAL://:9094, CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://broker:9092, EXTERNAL://127.0.0.1:10000
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

# Clustering
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1

# KIP-848
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
Binary file not shown.
7 changes: 7 additions & 0 deletions samples/sample-07/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

0 comments on commit 00fd11b

Please sign in to comment.