- KafkaCurator was designed to provide a simple and supervised way to manage everything related to Kafka topics.
- A lot of the code in this library was inspired by KafkaFlow.
- .NET Core 2.1 or above.
- .NET Framework 4.6.1 or above.
Install the required Nuget Packages:
Before you is a simple console application which runs the curator in order to create a set of topics configured within the topics.json
file.
For the complete project, feel free to view the Samples folder.
using Confluent.Kafka;
using KafkaCurator.Extensions.Microsoft.DependencyInjection;
using KafkaCurator.LogHandler.Console;
using Microsoft.Extensions.DependencyInjection;
using SecurityProtocol = KafkaCurator.Abstractions.Configuration.SecurityProtocol;
var services = new ServiceCollection();
services.AddKafkaCurator(kafka => kafka
.UseConsoleLog()
.AddCluster(cluster => cluster
.WithBrokers("localhost:9094")
.WithSecurityInformation(information => information.SecurityProtocol = SecurityProtocol.Ssl)
.ConfigureChangesManager(changesManager => changesManager
.WithAdminConfig(new AdminClientConfig())
.WithTopicPrefixToExclude("__")
.WithTopicPrefixToExclude("kafka-s3-sink")
.WithTimeout(TimeSpan.FromSeconds(15)))
.AddTopicsJsonFile("topics.json")
));
var provider = services.BuildServiceProvider();
var curator = provider.CreateCurator();
return await curator.ExecuteAsync();
All topic configurations are support with the exception of Replication Factor which will be supported later on.
Below is a list of the configurations:
CleanupPolicy
CompressionType
MessageDownConversionEnable
MinInSyncReplicas
SegmentJitterMs
FlushMs
FollowerReplicationThrottledReplicas
SegmentBytes
RetentionMs
FlushMessages
MessageFormatVersion
MaxCompactionLagMs
MaxMessageBytes
MinCompactionLagMs
MessageTimestampType
Preallocate
MinCleanableDirtyRation
IndexIntervalBytes
UncleanLeaderElectionEnable
RetentionBytes
DeleteRetentionMs
SegmentMs
MessageTimestampDifferenceMaxMs
SegmentIndexBytes
FileDeleteDelayMs
Each of the configuration entries shown above have its own method to be specified inline:
services.AddKafkaCurator(kafka => kafka
.UseConsoleLog()
.AddCluster(cluster => cluster
.WithBrokers("localhost:9092")
.AddTopic(topic => topic
.WithMinCompactionLagMs(10)
.WithMinInSyncReplicas(3)
.WithPreallocate(false)
)
));
Alternatively, you can specify all configuration within a json file and then add that json file to the curator and only update that file:
{
"Topics": [
{
"Name": "Rick.Sanchez",
"ReplicationFactor": 3,
"Partitions": 10,
"CleanupPolicy": "compact",
"MaxMessageBytes": "10485880"
},
{
"Name": "Morty.Smith",
"ReplicationFactor": 3,
"Partitions": 3,
"CleanupPolicy": "delete",
"Preallocate": false
}
]
}
And then add that file to the curator:
services.AddKafkaCurator(kafka => kafka
.UseConsoleLog()
.AddCluster(cluster => cluster
.WithBrokers("localhost:9092")
.AddTopicsJsonFile("topics.json")
));
kafka-curator is a free and open source project, released under the permissible MIT License