Skip to content

Commit

Permalink
KafkaConsumer standard oppsett (#1349)
Browse files Browse the repository at this point in the history
  • Loading branch information
jolarsen committed Mar 19, 2024
1 parent 4c3974c commit f7865ea
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 0 deletions.
5 changes: 5 additions & 0 deletions integrasjon/kafka-properties/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
<artifactId>felles-konfig</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>no.nav.foreldrepenger.felles</groupId>
<artifactId>felles-feil</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package no.nav.vedtak.felles.integrasjon.kafka;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaConsumerManager<K,V> {

private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);

private final List<KafkaMessageHandler<K,V>> handlers;
private final List<KafkaConsumerLoop<K,V>> consumers = new ArrayList<>();

public KafkaConsumerManager(List<KafkaMessageHandler<K, V>> handlers) {
this.handlers = handlers;
}

public void start(BiConsumer<String, Throwable> errorlogger) {
consumers.addAll(handlers.stream().map(KafkaConsumerLoop::new).toList());
consumers.forEach(c -> {
var ct = new Thread(c, "KC-" + c.handler().groupId());
ct.setUncaughtExceptionHandler((t, e) -> { errorlogger.accept(c.handler().topic(), e); stop(); });
ct.start();
});
Runtime.getRuntime().addShutdownHook(new Thread(new KafkaConsumerCloser<>(consumers)));
}

public void stop() {
consumers.forEach(KafkaConsumerLoop::shutdown);
var timeout = LocalDateTime.now().plus(CLOSE_TIMEOUT).plusSeconds(1);
while (!allStopped() && LocalDateTime.now().isBefore(timeout)) {
try {
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

public boolean allRunning() {
return consumers.stream().allMatch(KafkaConsumerLoop::isRunning);
}

public boolean allStopped() {
return consumers.stream().allMatch(KafkaConsumerLoop::isStopped);
}

public String topicNames() {
return handlers.stream().map(KafkaMessageHandler::topic).collect(Collectors.joining(","));
}

private record KafkaConsumerCloser<K,V>(List<KafkaConsumerLoop<K,V>> consumers) implements Runnable {
@Override
public void run() {
consumers.forEach(KafkaConsumerLoop::shutdown);
}
}

public static class KafkaConsumerLoop<K,V> implements Runnable {

private static final Duration POLL_TIMEOUT = Duration.ofMillis(100);
private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
private enum ConsumerState { UNINITIALIZED, RUNNING, STOPPING, STOPPED }
private static final int RUNNING = ConsumerState.RUNNING.hashCode();

private final KafkaMessageHandler<K, V> handler;
private KafkaConsumer<K, V> consumer;
private final AtomicInteger running = new AtomicInteger(ConsumerState.UNINITIALIZED.hashCode());

public KafkaConsumerLoop(KafkaMessageHandler<K,V> handler) {
this.handler = handler;
}
@Override
public void run() {
try(var key = handler.keyDeserializer().get(); var value = handler.valueDeserializer().get()) {
var props = KafkaProperties.forConsumerGenericValue(handler.groupId(), key, value, handler.autoOffsetReset());
consumer = new KafkaConsumer<>(props, key, value);
consumer.subscribe(List.of(handler.topic()));
running.set(RUNNING);
while (running.get() == RUNNING) {
var records = consumer.poll(POLL_TIMEOUT);
// Hvis man vil komplisere ting så kan man håndtere både OffsetCommit og DBcommit i en Transcational handleRecords.
// handleRecords må ta inn alle som er pollet (records) og 2 callbacks som a) legger til konsumert og b) commitAsync(konsumert)
for (var record : records) {
handler.handleRecord(record.key(), record.value());
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
if (consumer != null) {
consumer.close(CLOSE_TIMEOUT);
}
running.set(ConsumerState.STOPPED.hashCode());
}
}

public void shutdown() {
if (running.get() == RUNNING) {
running.set(ConsumerState.STOPPING.hashCode());
} else {
running.set(ConsumerState.STOPPED.hashCode());
}
if (consumer != null) {
consumer.wakeup();
}
}

public KafkaMessageHandler<K, V> handler() {
return handler;
}

public boolean isRunning() {
return running.get() == RUNNING;
}

public boolean isStopped() {
return running.get() == ConsumerState.STOPPED.hashCode();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package no.nav.vedtak.felles.integrasjon.kafka;

import java.util.Optional;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public interface KafkaMessageHandler<K,V> {

void handleRecord(K key, V value);

// Configuration
String topic();
String groupId(); // Keep stable (or it will read from autoOffsetReset()
default Optional<OffsetResetStrategy> autoOffsetReset() { // Implement if other than default (LATEST). Use NONE to discover low-volume topics
return Optional.empty();
}

// Deserialization - should be configured if Avro. Provided as Supplier to handle Closeable
Supplier<Deserializer<K>> keyDeserializer();
Supplier<Deserializer<V>> valueDeserializer();

// Implement KafkaStringMessageHandler for json-topics. The above are for Avro-topics
interface KafkaStringMessageHandler extends KafkaMessageHandler<String, String> {
default Supplier<Deserializer<String>> keyDeserializer() {
return StringDeserializer::new;
}

default Supplier<Deserializer<String>> valueDeserializer() {
return StringDeserializer::new;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
Expand Down Expand Up @@ -52,6 +55,31 @@ public static Properties forProducer() {
return props;
}

// Alle som konsumerer Json-meldinger
public static Properties forConsumerStringValue(String groupId) {
return forConsumerGenericValue(groupId, new StringDeserializer(), new StringDeserializer(), Optional.empty());
}

public static <K,V> Properties forConsumerGenericValue(String groupId, Deserializer<K> valueKey, Deserializer<V> valueSerde, Optional<OffsetResetStrategy> offsetReset) {
final Properties props = new Properties();

props.put(CommonClientConfigs.GROUP_ID_CONFIG, groupId);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, generateClientId());
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getAivenConfig(AivenProperty.KAFKA_BROKERS));
offsetReset.ifPresent(or -> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, or.toString()));

putSecurity(props);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, valueKey.getClass());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde.getClass());

// Polling
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // Unngå store Tx dersom alle prosesseres innen samme Tx. Default 500
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000"); // Gir inntil 1s pr record. Default er 600 ms/record

return props;
}

// Alle som konsumerer Json-meldinger
public static Properties forStreamsStringValue(String applicationId) {
return forStreamsGenericValue(applicationId, Serdes.String());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package no.nav.vedtak.felles.integrasjon.kafka;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import no.nav.vedtak.exception.IntegrasjonException;

public class KafkaSender {

private final Producer<String, String> producer;
private final String topic;

public KafkaSender(Producer<String, String> producer, String topic) {
this.producer = producer;
this.topic = topic;
}

public RecordMetadata send(String key, String message) {
try {
var record = new ProducerRecord<>(topic, key, message);
return producer.send(record).get();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw kafkaPubliseringException(e);
}
}

private IntegrasjonException kafkaPubliseringException(Exception e) {
return new IntegrasjonException("F-KAFKA-925475", "Unexpected error when sending message to topic " + topic, e);
}

}

0 comments on commit f7865ea

Please sign in to comment.