Skip to content

OutboundKafkaChannel

Xinglang Wang edited this page Aug 7, 2015 · 4 revisions

The Outbound Kafka Channel is a specialized Jetstream Outbound Channel implementation that enables event streams to be published to Kafka brokers on Kafka Topics. This channel serializes the Jetstream Event into its JSON/Avro form and publishes the JSON/Avro form of JetstreamEvent to Kafka brokers. Jetstream Framework provides a JSON serializer and a Avro serializer which should be used with this Channel. This channel points to one of the Kafka broker nodes.

There is no software development required to use this channel. All that is needed is to provide the bean specification for the channel and its supporting beans.

This section shows you how to provision an Outbound Kafka Channel. Before you read this section further it is recommended that you familiarize yourself with Apache Kafka Configuration

Specify the address bean.

This is where you specify a list of Kafka topics that this channel wishes to publish on.

<bean id="KafkaChannelAddress"
        class="com.ebay.jetstream.event.channel.kafka.KafkaChannelAddress">
        <property name="channelTopics">
            <list>
                <value>someKafkaTopic</value>
            </list>
        </property>
    </bean>

Specify a serializer bean.

This bean specifies the serializer class to be used by Kafka for serializing messages received from kafka broker.

JSON Serializer

Jetstream framework provides you a JSON serializer that serializes JSON messages to java.util.Map and vice versa. Remember all messages flowing in to Jetstream must be serialized to JetstreamEvent.

 <bean id="jsonEventSerializer"
        class="com.ebay.jetstream.event.channel.kafka.support.JSONMessageSerializer">
 </bean>

Avro Serializer

Avro will use a generic avro map schema to serialize the events. Here is the schema:

{"namespace": "jetstream", "name": "event","type": "record","fields": [ {"name": "attr", "type": {"type": "map", "values" : ["null","boolean","int","long","float","double","bytes","string", {"type": "array", "items": ["boolean","int","long","float","double","string"]}]}}]}
   <bean id="avroEventSerializer"
        class="com.ebay.jetstream.event.channel.kafka.support.AvroMessageSerializer">
        <property name="keyName" value="signatureAffinityKey" />
    </bean>

As the generic avro map can not support nested map, for nested map, it will use the json to serialize to string, and deserialize will not deserialize the json string to nested map.

  • For both Json and Avro serializer, jetstream only used the message key for kafka partition, if the keyName available, the type of the value for the key must be byte[] or String. And when using built jetstream inbound channel, it does not decode the key as the key's data is already in the JetstreamEvent. The keyName can be null, then it means the data will be round robin to kafka partition by using a random key.

Specify a Kafka Producer Config bean.

This bean contains all the properties required by the Kafka producer library.

<bean id="kafkaProducerConfig" class="com.ebay.jetstream.event.channel.kafka.KafkaProducerConfig">
        <property name="enabled" value="true" />
        <property name="poolSize" value="2" />
        <property name="props">
            <props>
                <prop key="metadata.broker.list">10.9.248.197:9092</prop>
                <prop key="request.required.acks">0</prop>
                <prop key="producer.type">async</prop>
                <prop key="serializer.class">kafka.serializer.DefaultEncoder</prop>
                <prop key="key.serializer.class">kafka.serializer.DefaultEncoder</prop>
                <prop key="partitioner.class">kafka.producer.DefaultPartitioner</prop>
                <prop key="compression.codec">0</prop>
                <prop key="compressed.topics">null</prop>
                <prop key="message.send.max.retries">2</prop>
                <prop key="retry.backoff.ms">100</prop>
                <prop key="topic.metadata.refresh.interval.ms">600000</prop>
                <prop key="queue.buffering.max.ms">1000</prop>
                <prop key="queue.buffering.max.messages">20000</prop>
                <prop key="queue.enqueue.timeout.ms">-1</prop>
                <prop key="batch.num.messages">300</prop>
                <prop key="send.buffer.bytes">1048576</prop>
                <prop key="client.id"></prop>
                <prop key="request.timeout.ms">1500</prop>
            </props>
        </property>
    </bean>

This is where you specify the host address of one of the broker nodes and port. Set the values of the properties to some appropriate values for your use case.

Specify the KafkaOutboundChannel bean

<bean id="OutboundKafkaChannel"
        class="com.ebay.jetstream.event.channel.kafka.OutboundKafkaChannel"
        depends-on="MessageService">
        <property name="address" ref="KafkaChannelAddress" />
        <property name="config" ref="kafkaProducerConfig" />
        <property name="serializer" ref="jsonEventSerializer" />
    </bean>

This bean takes a reference to an address bean containing all the topics for subscription, a producer config bean that contains all the Kafka producer properties and a serializer bean which specifies which serializer to use for message serialization.

Specify Channel Binder for OutboundKafkaChannel

The Channel binder is the container for the channel and manages lifecycle of the channel and provides flow control for the channel

 <bean id="OutboundKafkaChannelBinder" class="com.ebay.jetstream.event.support.channel.ChannelBinding"
        depends-on="MessageService">
        <property name="channel" ref="OutboundKafkaChannel" />
    </bean>
Clone this wiki locally