diff --git a/README.md b/README.md index 54f6d3d..59f2c37 100644 --- a/README.md +++ b/README.md @@ -299,7 +299,7 @@ spring: streams: binder: applicationId: "${spring.application.name}" - brokers: "localhost:9094" + brokers: "localhost:9092" configuration: default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde @@ -307,7 +307,7 @@ spring: ``` With this configuration: -* **Spring Cloud Stream** will create a **Kafka Streams binder** connected to **localhost:9094** +* **Spring Cloud Stream** will create a **Kafka Streams binder** connected to **localhost:9092** * We need to create a **@Bean** named **userStateStream** that should implement **Function** interface * This **@Bean** will connect a **KStream** subscribed to **pub.user.token** topic to another **KStream** publishing to **pub.user.state** topic @@ -471,13 +471,13 @@ docker-compose down Then you can use [kcat](https://github.com/edenhill/kcat) to produce/consume to/from **Kafka**: ```shell # consume -kcat -b localhost:9094 -C -t pub.user.token -f '%k %s\n' -kcat -b localhost:9094 -C -t pub.user.state -f '%k %s\n' +kcat -b localhost:9092 -C -t pub.user.token -f '%k %s\n' +kcat -b localhost:9092 -C -t pub.user.state -f '%k %s\n' # produce -echo '1:{"userId":"1", "token":1}' | kcat -b localhost:9094 -P -t pub.user.token -K: -echo '1:{"userId":"1", "token":2}' | kcat -b localhost:9094 -P -t pub.user.token -K: -echo '1:{"userId":"1", "token":3}' | kcat -b localhost:9094 -P -t pub.user.token -K: -echo '1:{"userId":"1", "token":4}' | kcat -b localhost:9094 -P -t pub.user.token -K: -echo '1:{"userId":"1", "token":5}' | kcat -b localhost:9094 -P -t pub.user.token -K: +echo '1:{"userId":"1", "token":1}' | kcat -b localhost:9092 -P -t pub.user.token -K: +echo '1:{"userId":"1", "token":2}' | kcat -b localhost:9092 -P -t pub.user.token -K: +echo '1:{"userId":"1", "token":3}' | kcat -b localhost:9092 -P -t pub.user.token -K: +echo '1:{"userId":"1", "token":4}' | kcat -b localhost:9092 -P -t pub.user.token -K: +echo '1:{"userId":"1", "token":5}' | kcat -b localhost:9092 -P -t pub.user.token -K: ``` diff --git a/docker-compose.yml b/docker-compose.yml index 7d21d71..4d4f2ea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,27 +1,22 @@ -version: "3" - services: - zookeeper: - image: confluentinc/cp-zookeeper:7.9.1 - hostname: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - - broker: - image: confluentinc/cp-kafka:7.9.1 - hostname: broker - depends_on: - - zookeeper + kafka-kraft: + image: confluentinc/cp-kafka:8.0.0 + hostname: kafka-kraft ports: - - "9094:9094" + - "9101:9101" + - "9092:9092" environment: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker:29094,LISTENER_DOCKER_EXTERNAL://localhost:9094 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + # From https://docs.confluent.io/platform/current/installation/docker/config-reference.html#cp-kafka-example + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-kraft:29092,PLAINTEXT_HOST://localhost:9092" + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: "localhost" + KAFKA_PROCESS_ROLES: "broker,controller" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-kraft:29093" + KAFKA_LISTENERS: "PLAINTEXT://kafka-kraft:29092,CONTROLLER://kafka-kraft:29093,PLAINTEXT_HOST://0.0.0.0:9092" + KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk" diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index af4e258..9874c4d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -14,7 +14,7 @@ spring: streams: binder: applicationId: "${spring.application.name}" - brokers: "localhost:9094" + brokers: "localhost:9092" configuration: default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde diff --git a/src/test/kotlin/com/rogervinas/kafkastreams/helper/DockerComposeContainerHelper.kt b/src/test/kotlin/com/rogervinas/kafkastreams/helper/DockerComposeContainerHelper.kt index 615bf1d..821417a 100644 --- a/src/test/kotlin/com/rogervinas/kafkastreams/helper/DockerComposeContainerHelper.kt +++ b/src/test/kotlin/com/rogervinas/kafkastreams/helper/DockerComposeContainerHelper.kt @@ -9,10 +9,8 @@ import java.io.File class DockerComposeContainerHelper { companion object { - private const val BROKER = "broker" - private const val BROKER_PORT = 9094 - private const val ZOOKEEPER = "zookeeper" - private const val ZOOKEEPER_PORT = 2181 + private const val BROKER = "kafka-kraft" + private const val BROKER_PORT = 9092 } fun createContainer(): ComposeContainer { @@ -23,14 +21,7 @@ class DockerComposeContainerHelper { BROKER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) .withStrategy(forListeningPort()) - .withStrategy(forLogMessage(".*started.*", 1)), - ) - .withExposedService( - ZOOKEEPER, - ZOOKEEPER_PORT, - WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY) - .withStrategy(forListeningPort()) - .withStrategy(forLogMessage(".*Started.*", 1)), + .withStrategy(forLogMessage(".*Kafka Server started.*", 1)), ) } }