Skip to content

Kafka to Jetstream : How it works

shmurthy62 edited this page Feb 12, 2015 · 2 revisions

InboundKafkaChannel

InboundKafkaChannel is the most important class. It acts as both a kafkaConsumer and an InboundChannel.

As a KafkaConsumer, it implements the methods defined in interface KafkaConsumer such as calcRebalance(), takePartitions() and releasePartitions(). It takes partitions and puts them into a blocking queue, and then it schedules several consuming threads to consume those partitions. For more details, please refer to the following descriptions in MainFlow.

As an InboundChannel, it implements all the features that an InboundChannel should have, such as pause/resume, graceful shutdown, and hot deploy.

KafkaController

KafkaController is a shared controller to all kafka consumers which controls the rebalance and zookeeper connection for them. So KafkaController has two functions: It schedules a rebalance task which will call the rebalance of each kafka consumer periodically. And its embedded ZkConnector is responsible for the Zookeeper connection. All kafka consumers rely on it to interact with the zookeeper and listen to the connection state.

ZkCoordinator

ZkCoordinator is used by InboundKafkaChannel to do coordination with the zookeeper, such as register consumerId, calculate how many partitions to take or release and mark a partition as taken or released. Three kinds of ZkCoordinator are provided: dynamic, fixedCount and static.
DynamicCoordinator distributes the partitions to each consumer node averagely. FixedCountCoordinator makes the consumer take the partitions of fixed count. StaticCoordinator only takes the partitions you configured for this consumer node. The details of configuration are here.

SimpleKafkaProcessor

SimpleKafkaProcessor is an implementation of the AbstractBatchEventProcessor. It provides the support for the simple user case which doesn't care about offset handling. The processor just simply passes the events to its downstream BatchEventSink or EventSinks. For EventSinks, the events in each batch are sent one by one. There is no way that the downstream event sinks could know when the offset advancement performs.

The SimpleKafkaProcessor implements the auto-advance of offset and provides two modes, "periodic" and "every-batch". For more details of the features and configuration of the two modes, please refer to here. For the feature of "noDuplication", if noDuplication is set to true, processor will cache the last batch, and will send them out after the offset is advance.

Main Flow

Main Flow

The InboundKafkaChannel creates m threads to handle n partitions from Kafka, while the m is configurable and n is balanced among the cluster. It holds the partitions in a queue. The consumer thread takes a partition from the queue, processes one batch of events from the partition and returns the partition to the queue. This is done round by round. On each round the consumer thread first tries to read a batch of events from Kafka and send them to the downstream eventSink. The downstream eventSink can return a BatchResponse to tell the InboundKafkaChannel whether to advance offset or revert back. And then the consumer thread does the advance or revert back. In BatchResponse, you can also set the next read offset or tell the partition to wait for a while before next fetch.

Look at the interface description of AbstractBatchEventProcessor for more details: batch-event-processor-interface

What you should pay attention is that different exceptions will be sent in onException(), such as OffsetOutOfRangeException, (Zookeeper)ConnectionLossException... And when OffsetOutOfRangeException is sent, the next read offset you set will not work. The InboundKafkaChannel will ask Kafka which offset is exactly "in range".

Rebalance

Rebalance The InboundkafkaChannel checks and does rebalance in a fixed interval. The rebalance process is all done automatically and internally. The only thing that the processor implementation should take care of, is that the 'onStreamTermination' could be called at any round of rebalance when a partition is to be released. The processor should do something in its onStreamTermination().

Case Handling

Exception Handling

InboundKafkaChannel also implements the methods defined in KafkaConsumer to deal with the cases caused by zookeeper's exception.

Zookeeper down and reconnected

It is found that zookeeper's ephemeral nodes will disappear after zookeeper is down and reconnected. zkReconnected() is called when reconnection happens. It rewrites ephemeral nodes such as consumerId node and partition's owner path to avoid node lost. Our implementation can handle this case and avoid the duplicate events after the zookeeper down and reconnected.

Coordination

To guarantee the partitions taken by this consumer node exactly matched with that recorded in zookeeper(although unmatch happens at very low probability), coordinate() will be called before every round's rebalance. It will take the partitions that should be taken and release those should not be taken without offset advance.

Kafka Broker Down

The InboundKafkaChannel can deal with the case that one of the kafka brokers is down. If the broker is a leader broker of any partition's replication, error will occur when try to read events from this broker. The InboundKafkaChannel will handle this error, read the new leader broker from zookeeper and retry to read. Consuming will no be blocked by kafka's broker down.

Pause/Resume

As an InboundChannel, IKC also supports Pause and Resume. When the InboundChannel is paused, all the consuming threads scheduled will stop reading, block and wait. And when the InboundChannel is resumed, all the threads will start to work again. The InboundChannel will not release partitions when it is paused.

Graceful Shutdown

The InboundChannel can be gracefully shutdown without any events lost or duplicated, for all the partitions will be "gracefully" released with the offset advanced and saved to zookeeper first. All the waited or blocked threads will be interrupted when the graceful shutdown is executed, so that graceful shutdown will not take a long time.

Hot Deploy

Both InboundKafkaChannel and KafkaController support hot deploy of configuration. The configurations are separated from the implementations. Since the change to configuration may affect the behavior of consuming threads, it will first unsubscribe the consumer node and release all the partitions gracefully. After then, it will resubscribe, take partitions and restart to consume, performing according to the new configuration.

Back to Kafka-to-Jetstream

Clone this wiki locally