The Kafka Router allows you to consume data from multiple Kafka instances / topics and route them to other Kafka instances / topics.
- clone Kafka data
e.g. consume data from a demo environment and replicate it to your local Kafka. - proxy external Kafka instances
e.g. have one whitelisted host consume from external Kafka sources and replicate data to internal Kafka instances, can also go the other way by routing data from internal Kafka sources to external Kafka instances. - merge messages
from multiple topics into one aggregate topic.
Two environment variables are used:
CONFIG_FILE
: the path to the config file. Default:/config/config.yaml
. The config file can be YAML (*.yaml
,*.yml
) or JSON (*.json
).RESSOURCE_PATH
: the path to the directory for additional resources (truststores, keystores, etc.). Default:/config
.
The config file format can be YAML (*.yaml
, *.yml
) or JSON (*.json
), auto-detected over the file suffix.
General configuration:
Properties:
consumer-group
Name of the consumer group for the Kafka Router (allowing multiple instances running concurrently and resuming data processing from committed offsets).
Default:kafka-router
backoff-strategy.backoff-time-seconds
When messages were read but could not be delivered (e.g. when the target Kafka is not reachable), the messages will not be committed in the source, but message delivery is retried instead. The backoff times (list of doubles for seconds) state after how many seconds a next attempt at delivering the messages should be made for the first, second, ..., attempt.
Default:0.5, 1, 2, 3, 5, 10
(after the 5th retry, wait 10 seconds before trying again).
consumer-group: kafka-router
backoff-strategy:
backoff-time-seconds: 1,2,3,4,5,10,30,60
Configure the Kafka instances you want to work with (as a consumer or provider, or both).
Config prefix: kafka
: map of Kafka instance name (used for routing config) with Kafka properties.
The names of the Kafka instances are lower-kebab-case, with letters and digits allowed.
Properties:
bootstrap-servers
: Kafka bootstrap servers, required.truststore-path
: for TLS connections: path to the X.509 truststore.truststore-password
: password of the truststore.keystore-path
: for MTLS connections: path to the X.509 keystore.keystore-password
: password of the keystore.properties
: additional Kafka properties, as key/value pairs
kafka:
city-winterthur:
bootstrap-servers: localhost:9092
country-switzerland:
bootstrap-servers: external.kafka-host.org:443
truststore-path: example-truststore.p12
truststore-password: SeCrEt-007!
keystore-path: example-keystore.p12
keystore-password: SeCrEt-007!
Configure the message routing (from which sources you want to consume data, and where it should be published).
The names of the Kafka instances are lower-kebab-case, with letters and digits allowed.
Config prefix: routes
: map of named routes, with the following properties:
source
: Kafka instance name (see above) from where data should be consumed.source-topic
: Source topic name, regular expression (allowing to specify several source topics)target
: Kafka instance key (see above) to which data should be routed.target-topic
: Optional target topic. If absent, the same topic as the source of the message is used.
The route name will become the name of the Kafka consumer group (as every route needs to consume messages independent of other routes).
routes:
# copy the sales topic from winterthur to switzerland
sales-winterthur-to-sales-switzerland:
source: city-winterthur
source-topic: sales
target: country-switzerland
target-topic: sales-winterthur
# copy the sales topic from chur to switzerland
sales-chur-to-sales-switzerland:
source: city-chur
source-topic: sales
target: country-switzerland
target-topic: sales-chur
# collect the sales from all locations to a sales-all topic
sales-all-switzerland:
- source: country-switzerland
source-topic: sales-.+
target: country-switzerland
target-topic: sales-all
# collect all sales globally
sales-global:
- source: country-switzerland
source-topic: sales-all
target: global
target-topic: all-sales
Build with Maven (default goals: clean install
):
mvn
This builds an Uber JAR (portable Java application with libraries included), which you can run with:
java -jar target/kafka-router.jar config.yaml
Build the docker image:
mvn && docker build -t pwalser75/kafka-router .
Run the docker container:
docker run -it --volume ./docker-example-config.yaml:/config/config.yaml pwalser75/kafka-router
Complete example with Kafka-Router, Kafka and Redpanda Console (Kafka viewer):
docker-compose up
or in detached mode:
docker-compose up -d
You can then view the logs with:
docker logs -f kafka-router
__ __ _____ ___ __
/ //_/__ _/ _/ /_____ _ / _ \___ __ __/ /____ ____
/ ,< / _ `/ _/ '_/ _ `/ / , _/ _ \/ // / __/ -_) __/
/_/|_|\_,_/_//_/\_\\_,_/ /_/|_|\___/\_,_/\__/\__/_/
https://github.com/pwalser75/kafka-router
13:15:49.439 INFO [main] | c.f.tools.kafkarouter.ResourceLoader - reading configuration from /config/config.yaml
13:15:49.593 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - Configuring routes:
13:15:49.593 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - - kafka (sales-winterthur) -> kafka (sales-switzerland)
13:15:49.712 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - - kafka (sales-chur) -> kafka (sales-switzerland)
13:15:49.719 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - Starting routes:
13:15:49.720 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Joining consumer group...
13:15:49.720 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Subscribing to initial topics:
13:15:52.758 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - - sales-winterthur
13:15:52.759 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Joining consumer group...
13:15:52.759 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Subscribing to initial topics:
13:15:52.763 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - - sales-chur
13:15:52.764 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - Startup complete, ready to route messages...
The Redpanda Kafka Viewer is accessible over http://localhost:9000. Here you see the topics, and can also upload test messages (select topic, then Actions>Publish Message) and check if they're routed.
The consumer groups (one for each route) are visible as well:
The Kafka Router also adds an additional header X-Kafka-Router-Source
,
stating from which source / topic / partition / offset the messages was routed: