This component provides a vert.x wrapper around the most important functions of Kafka’s AdminUtils.
AdminUtils are used to create, modify, and delete topics. Other functionality covered by AdminUtils, but not this wrapper, includes Partition Management, Broker Configuration management, etc.
You can call createTopic
to create a topic.
Parameters are: topic name, number of partitions, number of replicas, and the usual callback to handle the result.
It might return an error, e.g. if the number of requested replicas is greater than the number of brokers.
AdminUtils adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true);
// Create topic 'myNewTopic' with 2 partition and 1 replicas
adminUtils.createTopic("myNewTopic", 2, 1, result -> {
if(result.succeeded())
System.out.println("Creation of topic myNewTopic successful!");
else
System.out.println("Creation of topic myNewTopic failed: "+
result.cause().getLocalizedMessage());
});
You can call deleteTopic
to delete a topic.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
AdminUtils adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true);
// Delete topic 'myNewTopic'
adminUtils.deleteTopic("myNewTopic", result -> {
if(result.succeeded())
System.out.println("Deletion of topic myNewTopic successful!");
else
System.out.println("Deletion of topic myNewTopic failed: "+
result.cause().getLocalizedMessage());
});
If you need to update the configuration of a topic, e.g., you want to update the retention policy,
you can call changeTopicConfig
to update a topic.
Parameters are: topic name, a Map (String → String) with parameters to be changed,
and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
AdminUtils adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true);
// Set retention to 1000 ms and max size of the topic partition to 1 kiByte
Map<String, String> properties = new HashMap<>();
properties.put("delete.retention.ms", "1000");
properties.put("retention.bytes", "1024");
adminUtils.changeTopicConfig("myNewTopic", properties, result -> {
if(result.succeeded())
System.out.println("Configuration change of topic myNewTopic successful!");
else
System.out.println("Configuration change of topic myNewTopic failed: "+
result.cause().getLocalizedMessage());
});}
If you want to check if a topic exists, you can call topicExists
.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
AdminUtils adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true);
adminUtils.topicExists("myNewTopic", result -> {
if(result.succeeded()) {
System.out.println("Topic myNewTopic exists: "+result.result());
}
else
System.out.println("Failed to check if topic myNewTopic exists: "+
result.cause().getLocalizedMessage());
});