Skip to content
Vadim Chekan edited this page Apr 21, 2015 · 4 revisions

Concurrency

Kafka4net uses asynchronous single threaded execution loop internally. There are no long lasting computations inside the driver. Most intense work, which the driver has to do is grouping messages by brokerId and sending them to socket. All network operations are asynchronous, so as soon as "start sending" is complete, boker is ready to execute new command, such as handling server response from previous command.

This design was chosen because the driver has to manipulate a lot of structures. Queues of messages grouped by broker (different partitions of different topics can go to the same broker), metadata structures (lists of brokers, topics, partitions, and which partition belong to which broker). As driver works, it discovers requested topics and partitions. As brokers fail and recover, association of partitions to a broker may change.

An attempt to use locks to protect structures would cause quite complicated code. Driver developer would have to remember which locks protect what and avoid deadlocks. Unit testing would provide very little confidence that the code is well tested, because lock errors tend to be highly elusive. In order to solve those problems, developer will use more and more coarse locks which will start causing contentions and compromise the initial goal of being multi-threaded.

With asynchronous operations and EventLoopScheduler it is possible to avoid any protection on data structures, because all execution happens in single thread.

Callback execution context

The picture above about single-threaded asynchronous processing is all rosy until you have to pass the result of driver's work to the hosing application. If you call al application's OnMessage or OnError callback with batch of received messages, the app may decide to dump the batch into SQL or send to HTTP server and do it synchronously. Which means that internal driver's event loop thread will be taken for tens of seconds. And no response from server or failure won't be processed.

In order to prevent this, all calls from driver to application are sent through SynchronizationContext

Cluster

Cluster is responsible for collecting and updating metadata, i.e. list of broker, topics, partitions, their relations and up/down state. Cluster is the owner of EventLoopScheduler and component which is responsible for "pinging" failed brokers, PartitionRecoveryMonitor. Cluster internally exposes number of IObservable for events like partition failed or recovered, new brokers discovered, metadata changed, etc.

Consumer and Producer use Cluster.

Producer

Most of what Producer does, is resolving which Broker does given message should be routed to. Once message key is calculated, partitioner decides which partition given message will go into. Than message is enqueued into a queue per partition structure.

Queue per partition design was dictated by several considerations. It might be tempting to build a queue per broker, and once partition is calculated from key, figure out broker and enqueue. But if broker will fail and new leader broker for partition is elected, we would have to re-scan and re-queue messages for failed broker to another queue. But we can not just put failed messages in front of the new queue, because it may contain messages "in progress" which are being sent. So we would need to put them in the middle of the queue or in the end.

If put in the end, that would be "unfair" for those messages because now they are in the end of the queue while being sent first. While it does not violate the specification (order of messages with the same key does not change), it does generate strange order and timing of producing. Especially if memory buffers are full for some reasons. Also it is more complicated from algoritmic point of view.

When queue per partition is used, grouping by broker happens right before sending a produce message to kafka. And when broker is failed/recovered no changes in the queues are required.

Another goal achieved by queue structures is to limit size of message set sent to a broker. Send algorithm walks queues starting from queues heads towards tail, until size limit is not hit. This prevents sending unreasonably large messages if brokers were offline and large amount of messages accumulated in memory buffers.

Once sent, message is not removed from the queue until produce message to kafka returns success result. If failure happen, partition is marked as failed and PartitionRecoveryMonitor starts. Once partition is back online, messages are sent again. This makes kafka4net "at least once" sender. Meaning, duplicates are possible.

Shutdown

Consumer

TBD, I plan on using Dataflow blocks instead of Rx for data fetching in v-2.0

Flow control

Positioning

Shutdown

Integration testing

Important part of kafka4net development process is integration testing driven development (is there acronym for it yet?). It is implemented with Vagrant script which configure one Zookeeper VM and cluster of 3 kafka VMs. There are no unit-tests in kafka4net project, all are integration tests which work against real kafka servers. There are a lot of tests which simulate broker failure and recovery.