Skip to content

Commit

Permalink
KAFKA-14752: Kafka examples improvements - processor changes (apache#…
Browse files Browse the repository at this point in the history
…13516)

Reviewers: Luke Chen <showuon@gmail.com>
  • Loading branch information
fvaleri committed May 11, 2023
1 parent a263627 commit ee41328
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 106 deletions.
211 changes: 111 additions & 100 deletions examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
Expand Up @@ -20,58 +20,65 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static java.time.Duration.ofMillis;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;

/**
* A demo class for how to write a customized EOS app. It takes a consume-process-produce loop.
* Important configurations and APIs are commented.
* This class implements a read-process-write application.
*/
public class ExactlyOnceMessageProcessor extends Thread {

private static final boolean READ_COMMITTED = true;

public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener {
private final String bootstrapServers;
private final String inputTopic;
private final String outputTopic;
private final String transactionalId;
private final String groupInstanceId;
private final CountDownLatch latch;
private final String transactionalId;
private volatile boolean closed;

private final KafkaProducer<Integer, String> producer;
private final KafkaConsumer<Integer, String> consumer;

private final CountDownLatch latch;

public ExactlyOnceMessageProcessor(final String inputTopic,
final String outputTopic,
final int instanceIdx,
final CountDownLatch latch) {
public ExactlyOnceMessageProcessor(String threadName,
String bootstrapServers,
String inputTopic,
String outputTopic,
CountDownLatch latch) {
super(threadName);
this.bootstrapServers = bootstrapServers;
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.transactionalId = "Processor-" + instanceIdx;
this.transactionalId = "tid-" + threadName;
// It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
final int transactionTimeoutMs = 10000;
int transactionTimeoutMs = 10_000;
// A unique transactional.id must be provided in order to properly use EOS.
producer = new Producer(
"processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null)
.createKafkaProducer();
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = "Txn-consumer-" + instanceIdx;
this.groupInstanceId = "giid-" + threadName;
boolean readCommitted = true;
consumer = new Consumer(
"processor-consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
Expand All @@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String inputTopic,

@Override
public void run() {
// Init transactions call should always happen first in order to clear zombie transactions from previous generation.
producer.initTransactions();

final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);

consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
printWithTxnId("Received partition assignment after rebalancing: " + partitions);
messageRemaining.set(messagesRemaining(consumer));
}
});

int messageProcessed = 0;
while (messageRemaining.get() > 0) {
try {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
if (records.count() > 0) {
// Begin a new transaction session.
producer.beginTransaction();
for (ConsumerRecord<Integer, String> record : records) {
// Process the record and send to downstream.
ProducerRecord<Integer, String> customizedRecord = transform(record);
producer.send(customizedRecord);
int processedRecords = 0;
long remainingRecords = Long.MAX_VALUE;
// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
int transactionTimeoutMs = 10_000;
// consumer must be in read_committed mode, which means it won't be able to read uncommitted data
boolean readCommitted = true;
try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
// called first and once to fence zombies and abort any pending transaction
producer.initTransactions();

consumer.subscribe(singleton(inputTopic), this);

Utils.printOut("Processing new records");
while (!closed && remainingRecords > 0) {
try {
ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
if (!records.isEmpty()) {
// begin a new transaction session
producer.beginTransaction();

for (ConsumerRecord<Integer, String> record : records) {
// process the record and send downstream
ProducerRecord<Integer, String> newRecord =
new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
producer.send(newRecord);
}

// checkpoint the progress by sending offsets to group coordinator broker
// note that this API is only available for broker >= 2.5
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());

// commit the transaction including offsets
producer.commitTransaction();
processedRecords += records.count();
}
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
| FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
// we can't recover from these exceptions
Utils.printErr(e.getMessage());
shutdown();
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
// invalid or no offset found without auto.reset.policy
Utils.printOut("Invalid or no offset found, using latest");
consumer.seekToEnd(emptyList());
consumer.commitSync();
} catch (KafkaException e) {
// abort the transaction and try to continue
Utils.printOut("Aborting transaction: %s", e);
producer.abortTransaction();
}
remainingRecords = getRemainingRecords(consumer);
if (remainingRecords != Long.MAX_VALUE) {
Utils.printOut("Remaining records: %d", remainingRecords);
}
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Processed %d records", processedRecords);
shutdown();
}

Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
Utils.printOut("Revoked partitions: %s", partitions);
}

// Checkpoint the progress by sending offsets to group coordinator broker.
// Note that this API is only available for broker >= 2.5.
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Utils.printOut("Assigned partitions: %s", partitions);
}

// Finish the transaction. All sent records should be visible for consumption now.
producer.commitTransaction();
messageProcessed += records.count();
}
} catch (ProducerFencedException e) {
throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
} catch (FencedInstanceIdException e) {
throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
} catch (KafkaException e) {
// If we have not been fenced, try to abort the transaction and continue. This will raise immediately
// if the producer has hit a fatal error.
producer.abortTransaction();

// The consumer fetch position needs to be restored to the committed offset
// before the transaction started.
resetToLastCommittedPositions(consumer);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
Utils.printOut("Lost partitions: %s", partitions);
}

messageRemaining.set(messagesRemaining(consumer));
printWithTxnId("Message remaining: " + messageRemaining);
public void shutdown() {
if (!closed) {
closed = true;
latch.countDown();
}

printWithTxnId("Finished processing " + messageProcessed + " records");
latch.countDown();
}

private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition topicPartition : consumer.assignment()) {
offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
}
return offsets;
}

private void printWithTxnId(final String message) {
System.out.println(transactionalId + ": " + message);
}

private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
}

private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
// If we couldn't detect any end offset, that means we are still not able to fetch offsets.
// if we can't detect any end offset, that means we are still not able to fetch offsets
if (fullEndOffsets.isEmpty()) {
return Long.MAX_VALUE;
}

return consumer.assignment().stream().mapToLong(partition -> {
long currentPosition = consumer.position(partition);
printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
if (fullEndOffsets.containsKey(partition)) {
return fullEndOffsets.get(partition) - currentPosition;
} else {
return 0;
}
return 0;
}).sum();
}

private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {
final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
consumer.assignment().forEach(tp -> {
OffsetAndMetadata offsetAndMetadata = committed.get(tp);
if (offsetAndMetadata != null)
consumer.seek(tp, offsetAndMetadata.offset());
else
consumer.seekToBeginning(Collections.singleton(tp));
});
}
}
15 changes: 9 additions & 6 deletions examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This exactly once demo driver takes 3 arguments:
Expand Down Expand Up @@ -71,7 +73,7 @@
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
*/
public class KafkaExactlyOnceDemo {

public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";

Expand Down Expand Up @@ -102,11 +104,12 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);

/* Stage 3: transactionally process all messages */
for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(
INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch);
messageProcessor.start();
}
CountDownLatch processorsLatch = new CountDownLatch(numInstances);
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
.mapToObj(id -> new ExactlyOnceMessageProcessor(
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
.collect(Collectors.toList());
processors.forEach(ExactlyOnceMessageProcessor::start);

if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
Expand Down

0 comments on commit ee41328

Please sign in to comment.