Skip to content

Kafka to Jetstream : How to use it

shmurthy62 edited this page Feb 12, 2015 · 5 revisions

Specify a kafka processor bean

This is the spring configuration to specify a kafka processor bean.

<bean id="yourKafkaProcessorId" class="full.name.of.your.KafkaProcessorClass">
</bean>

Users can implement a custom kafka processor with full functionality, or just use the built-in SimpleKafkaProcessor in Jetstream.

Custom kafka processor

Users can extend the AbstractBatchEventProcessor to write their own kafka processors. Please go to this page first, for the detail description about the Jetstream batch processing and the related interfaces and classes.

The implementation of AbstractBatchEventProcessor allows a user to choose between at-least-once or at-most-once semantics by choosing to process the data before/after offset commits. To process the data before offset committing, processor should process it in the onNextBatch() method. In this way it guarantees that there is no event loss. In order to process the events after offset committing, processors should process the event in the context of onBatchProcessed() method. This guarantees that your processor will never see a duplicate event.

You should also pay attention to any exceptions thrown by your implementations of onNextBatch(),onIdle() methods. In such a situation, the InboundKafkaChannel will notify the downstream with this exception by calling onException(). So you should implement your own onException() and deal with it, such as telling the upstream to revert back the offset.

SimpleKafkaProcessor

The SimpleKafkaProcessor is a built-in bean in Jetstream, which can be directly used. Before using it , you should first specify a processor config bean like this.

<bean id="yourKafkaProcessorConfigId" class="com.ebay.jetstream.event.processor.kafka.SimpleKafkaProcessorConfig">
    <!-- config mode as periodic or every-batch, default is periodic, for details go to the following description -->
    <property name="autoAdvanceMode" value="periodic" />
    <!-- useful when advanceMode=periodic, the interval of periodical advancement, default is 30s -->
    <property name="autoAdvanceInterval" value="30000" />
    <!-- the rate to pass events, the count to be passed in 1s, works when value>0, default is -1 -->
    <property name="maxReadRate" value="-1" />
    <!-- default is false, can only be set to true olny when autoAdvanceMode=every-batch, it ensures every batch will be sent after advance to avoid duplication -->
    <property name="noDuplication" value="false" />
</bean>

Then you can config a simple kafka processor bean like this:

<bean id="yourKafkaProcessorId" class="com.ebay.jetstream.event.processor.kafka.SimpleKafkaProcessor">
    <property name="config" ref="yourKafkaProcessorConfigId" />
    <property name="eventSinks">
        <list>
            <ref bean="yourDownstreamSinks" />
	</list>
    </property>
</bean>

This processor just simply passes the events to it's downstream EventSinks or BatchEventSinks. (When the downstream is BatchEventSinks, the EventMetaInfo received by the SimpleKafkaProcessor will not be passed to the downstream). SimpleKafkaProcessor provides 3 additional features:

  1. Auto advance offset: In SimpleKafkaProcessor the offset can only be controlled by auto advance. Two auto advance modes are provided, "periodic" and "every-batch". When auto advance mode is set to "periodic", offset will be advanced periodically according to the interval you set. The interval is configured by autoAdvanceInterval, the default value is 30 seconds. When auto advance mode is set to "every-batch", offset will be advanced when every batch is sent.
  2. noDuplication: This property must be set to true if your use case can not tolerate duplicate events.
  3. Rate control: When the maxReadRate is configured > 0, events will be delivered to the processor at the "maxreadRate". This feature is disabled by default.

Specify a Kafka Consumer Config.

This bean contains all the properties required by a InboundKafkaChannel. Each InboundKafkaChannel maps to a consumer instance in Kafka terminology. We call it 'consumer' in short, in the following doc. The property is not required if there is a default value in the description.

 <bean id="yourKafkaConsumerConfigId"
        class="com.ebay.jetstream.event.channel.kafka.KafkaConsumerConfig">
    <!-- whether the consumer is enabled -->
    <property name="enabled" value="true" />
    <!-- if the consumer starts to consume the data right after the init, or after a resume -->
    <property name="subscribeOnInit" value="true" />
    <!-- the number of threads to do the cosuming task -->
    <property name="poolSize" value="6" />
    <!-- the batch size, note it's in bytes, not in event count, default is 1024 * 1024 -->
    <property name="batchSizeBytes" value="1048576" />
    <!-- whether the partitions are dynamically allocated among nodes or static allocated, default is true, the dynamically allocation is supported by rebalance. The static allocation is hard configured. -->
    <property name="dynamicAllocatePartition" value="true" />
    <!-- how many partitions each node should take. This is meaningful only when the dynamicAllocatePartition is false -->
    <property name="fixedPartitionCountPerTopic" value="5" />
    <!-- which partitions of each topic this node should take. This is meaningful only when the dynamicAllocatePartition is false. -->
    <property name="allocatedPartitions" value="1,3,5" />
    <!-- the consumer group id, required -->
    <property name="groupId" value="soj-bl" />
    <!-- max retries when the rebalance fails, default: 4 -->
    <property name="rebalanceMaxRetries" value="4" />
    <!-- when the the consumer run the first time, or the offset read from zookeeper is out of range, whether the consumer should take the 'smallest' offset (the beginning data) or 'largest' offset (the latest data), default: largest -->
    <property name="autoOffsetReset" value="largest" />
    <!-- max ms to wait when there is no data in the partition, default: 1000 -->
    <property name="fetchWaitMaxMs" value="1000" />
    <!-- network config, socket timeout, default: 30000 -->
    <property name="socketTimeoutMs" value="30000" />
    <!-- network config, socket receive buffer size, default: 65536 -->
    <property name="socketReceiveBufferBytes" value="65536" />
</bean>

Specify the KafkaController bean

The KafkaController bean is used for the control of the rebalance and the zookeeper connection. Class ZkConnector is embedded to control zookeeper connection. The ZkConnector is used for the data communication between the InboundKafkaChannel and Zookeeper. The InboundKafkaChannel depends on the zookeeper for the offset store, partition allocation, etc. Each InboundKafkaChannel can be connected to a KafkaController. The KafkaController bean can be shared among multiple InboundKafkaChannels. 

To use KafkaController, you should first specify a config bean:

<bean id="yourKafkaControllerConfig" class="com.ebay.jetstream.event.channel.kafka.KafkaControllerConfig">
    <!-- the interval of each round of rebalance controlling -->
    <property name="rebalanceInterval" value="60000" />
    <!-- zookeeper endpoints, can be list of host:port. required -->
    <property name="zkConnect" value="10.9.223.78:2181,10.9.209.66:2181" />
    <!-- zookeeper connection timeout, zookeeper client  config, default: 20000 -->
    <property name="zkConnectionTimeoutMs" value="20000" />
    <!-- zookeeper session timeout, zookeeper client config, default: 30000 -->
    <property name="zkSessionTimeoutMs" value="30000" />
    <!-- retry times when the zookeeper can't be connected, default: 2-->
    <property name="retryTimes" value="2" />
    <!-- sleep time between retries, default: 500ms -->
    <property name="sleepMsBetweenRetries" value="500" />
</bean>

Then you can specify a KafkaController bean:

<bean id="yourKafkaController" class="com.ebay.jetstream.event.channel.kafka.KafkaController">
    <property name="config" ref="yourKafkaControllerConfig" />
</bean>

Specify the address bean

This is where you specify a list of Kafka topics that this channel subscribes to.

<bean id="yourKafkaChannelAddressId" class="com.ebay.jetstream.event.channel.kafka.KafkaChannelAddress">
    <property name="channelTopics">
        <list>
            <value>someKafkaTopics</value>
        </list>
    </property>
</bean>

Specify a serializer bean

This bean specifies the serializer class to be used by Kafka for serializing messages received from kafka broker. Jetstream framework provides some default serializers such as JSON serializer, which serializes JSON messages to Java.util.Map. The user could implement their own serializers. Remember all messages flowing in to Jetstream must be serialized Maps.

 <bean id="yourEventSerializerId"
        class="com.ebay.jetstream.event.channel.kafka.support.JSONMessageSerializer">
 </bean>

Specify the InboundKafkaChannel bean as follow.

The properties of this bean are Kafka producer properties.

<bean id="yourInboundKafkaChannelId"
        class="com.ebay.jetstream.event.channel.kafka.InboundKafkaChannel"
        depends-on="MessageService">
    <property name="address" ref="yourKafkaChannelAddressId" />
    <property name="config" ref="yourKafkaConsumerConfigId" />
    <property name="serializer" ref="yourEventSerializerId" />
    <property name="kafkaController" ref="yourKafkaController" />
    <property name="eventSinks">
        <list>
            <ref bean="yourKafkaProcessor" /> 
        </list>
    </property>
</bean>

This bean takes a reference to an address bean containing all the topics for subscription, a consumer config bean that contains all the Kafka consumer properties and a serializer bean which specifies which serilaizer to use for message serialization.

The list of event sinks receiving events from InboundKafkaChannel is specified by the "eventSinks" property. The bean ref of the Kafka processor should be put here. There is one limitation compared with other inbound channels: there must be only one BatchProcessor in this list, otherwise there will be exception during initialization.

Specify Channel Binder for InboundKafkaChannel

A channel binder is a container of the channel and does lifecycle management and flow control for channels

<bean id="yourInboundKafkaChannelBinderId" class="com.ebay.jetstream.event.support.channel.ChannelBinding">
    <property name="channel" ref="yourInboundKafkaChannelId" />
</bean>

Metadata injected into JetstreamEvent

Metadata of kafka events such as topic, partition and offset info are injected into JetstreamEvent with a reserved key. The key is listed under enum JetstreamReservedKeys with the name "EventKafkaMeta("js_km")". The code below shows how to read the metadata.

String str = (String) event.get(JetstreamReservedKeys.EventKafkaMeta.toString());
EventKafkaMetadata meta = EventKafkaMetadata.decodeInstance(str);
String topic = meta.getTopic();
int partition = meta.getPartition();
long offset = meta.getOffset();

These metadata can be used by the downstream sinks for some scenarios such as replay of events.

Back to Kafka-to-Jetstream

Clone this wiki locally