Skip to content

konnectkode/kafka-cdi

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

23 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka-CDI - A extension for Apache Kafka

CircleCI Codecov Maven Central GitHub

Getting started

The config is done by implementing the interface KafkaConfigProperties with qualifier @KafkaConfig. The interface is composed by three methods commonClientConfigs(), producerConfigs() and consumerConfigs(), each methods should return only the specific Kafka configuration org.apache.kafka.clients.CommonClientConfigs, org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig respectively.

@KafkaConfig
public class KafkaConfigPropertiesImpl implements KafkaConfigProperties {
    
    @Override
    public Properties commonClientConfigs() {
        Properties properties = new Properties();
        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, "client.id");

        return properties;
    }
    
    
    @Override
    public Properties producerConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional.id");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Long.MAX_VALUE);
        
        return properties;
    }

    @Override
    public Properties consumerProperties() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.id");
        properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    
        return properties;
    }

}

Creating a Kafka Consumer

The API provides the @Consumer annotation to register Kafka Consumers which internally will be used to create a vanilla KafkaConsumer.

public class MyConsumer {

    private final Logger LOG = LoggerFactory.getLogger(MyConsumer.class);

    @Consumer(topics = {"topic.one", "topic.two"}, groupId = "my-consumer-group")
    public void consume(@Value String payload) {
        LOG.info("Received message: {}", payload);
    }
}

Receive additional informations is also possible using @Key and @Header annotations to inject the record key and record header.

import org.apache.kafka.common.header.Headers;

public class MyConsumer {

    private final Logger LOG = LoggerFactory.getLogger(MyConsumer.class);

    @Consumer(topics = {"topic.one", "topic.two"}, groupId = "my-consumer-group")
    public void consume(@Value String payload, @Key String key, @Headers Headers headers) {
        LOG.info("Received message: {}, key: {}, header: {}", payload, key, headers);
    }
}

The groupId is optional if the KafkaConfigProperties.consumerConfig() method returns the ConsumerConfig.GROUP_ID_CONFIG property. If both are present, @Consumer(groupId = "any") will be used.

Deserializer

The Consumer default deserializer is org.apache.kafka.common.serialization.StringDeserializer for the key and value, both can be changed in the @Consumer annotation by any class that implements the org.apache.kafka.common.serialization.Deserializer interface.

public class MyConsumer {

    private final Logger LOG = LoggerFactory.getLogger(MyConsumer.class);

    @Consumer(topics = {"topic.one", "topic.two"}, groupId = "my-consumer-group", keyDeserializer = IntegerDeserializer.class)
    public void consume(@Value String payload, @Key Integer key) {
        LOG.info("Received message: {}", payload);
    }
}

Injecting a KafkaProducer

For better flexibility and compatibility, the API provides a properly configured vanilla KafkaProducer, without wrappers or decorators. However, if the ProducerConfig.TRANSACTIONAL_ID_CONFIG property is returned by the KafkaConfigProperties.producerConfig() method, the API will be responsible for calling the KafkaProducer.initTransactions() method before any other method.

public class MyProducer {

    @Inject
    @Producer
    private KafkaProducer<Integer, String> producer;

    public void myProducerMethod(String message) {
        producer.send(new ProducerRecord<>("kafka.topic", message));
    }
}

KafkaProducer is thread safe, so the producer is declared as @ApplicationScoped.

About

Kafka-CDI - A extension for Apache Kafka

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages