Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ spring:
streams:
binder:
applicationId: "${spring.application.name}"
brokers: "localhost:9094"
brokers: "localhost:9092"
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
```

With this configuration:
* **Spring Cloud Stream** will create a **Kafka Streams binder** connected to **localhost:9094**
* **Spring Cloud Stream** will create a **Kafka Streams binder** connected to **localhost:9092**
* We need to create a **@Bean** named **userStateStream** that should implement **Function<KStream, KStream>** interface
* This **@Bean** will connect a **KStream** subscribed to **pub.user.token** topic to another **KStream** publishing to **pub.user.state** topic

Expand Down Expand Up @@ -471,13 +471,13 @@ docker-compose down
Then you can use [kcat](https://github.com/edenhill/kcat) to produce/consume to/from **Kafka**:
```shell
# consume
kcat -b localhost:9094 -C -t pub.user.token -f '%k %s\n'
kcat -b localhost:9094 -C -t pub.user.state -f '%k %s\n'
kcat -b localhost:9092 -C -t pub.user.token -f '%k %s\n'
kcat -b localhost:9092 -C -t pub.user.state -f '%k %s\n'

# produce
echo '1:{"userId":"1", "token":1}' | kcat -b localhost:9094 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":2}' | kcat -b localhost:9094 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":3}' | kcat -b localhost:9094 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":4}' | kcat -b localhost:9094 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":5}' | kcat -b localhost:9094 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":1}' | kcat -b localhost:9092 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":2}' | kcat -b localhost:9092 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":3}' | kcat -b localhost:9092 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":4}' | kcat -b localhost:9092 -P -t pub.user.token -K:
echo '1:{"userId":"1", "token":5}' | kcat -b localhost:9092 -P -t pub.user.token -K:
```
39 changes: 17 additions & 22 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
version: "3"

services:

zookeeper:
image: confluentinc/cp-zookeeper:7.9.1
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:7.9.1
hostname: broker
depends_on:
- zookeeper
kafka-kraft:
image: confluentinc/cp-kafka:8.0.0
hostname: kafka-kraft
ports:
- "9094:9094"
- "9101:9101"
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://broker:29094,LISTENER_DOCKER_EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# From https://docs.confluent.io/platform/current/installation/docker/config-reference.html#cp-kafka-example
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-kraft:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-kraft:29093"
KAFKA_LISTENERS: "PLAINTEXT://kafka-kraft:29092,CONTROLLER://kafka-kraft:29093,PLAINTEXT_HOST://0.0.0.0:9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
2 changes: 1 addition & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spring:
streams:
binder:
applicationId: "${spring.application.name}"
brokers: "localhost:9094"
brokers: "localhost:9092"
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import java.io.File

class DockerComposeContainerHelper {
companion object {
private const val BROKER = "broker"
private const val BROKER_PORT = 9094
private const val ZOOKEEPER = "zookeeper"
private const val ZOOKEEPER_PORT = 2181
private const val BROKER = "kafka-kraft"
private const val BROKER_PORT = 9092
}

fun createContainer(): ComposeContainer {
Expand All @@ -23,14 +21,7 @@ class DockerComposeContainerHelper {
BROKER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*started.*", 1)),
)
.withExposedService(
ZOOKEEPER,
ZOOKEEPER_PORT,
WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.withStrategy(forListeningPort())
.withStrategy(forLogMessage(".*Started.*", 1)),
.withStrategy(forLogMessage(".*Kafka Server started.*", 1)),
)
}
}