Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] Offset in send callback is not continuous when batching is enabled #332

Closed
BewareMyPower opened this issue Jan 18, 2021 · 6 comments · Fixed by #443
Closed

[BUG] Offset in send callback is not continuous when batching is enabled #332

BewareMyPower opened this issue Jan 18, 2021 · 6 comments · Fixed by #443

Comments

@BewareMyPower
Copy link
Collaborator

BewareMyPower commented Jan 18, 2021

Describe the bug
#296 implemented continuous offset, but in producer's send callback, the offset is not continuous when batching is enabled.

To Reproduce

Modify KafkaRequestHandlerTest#testProduceCallbck to following code and rerun.

    public void testProduceCallback() throws Exception {
        final String topic = "test-produce-callback";
        final int numMessages = 20;
        final String messagePrefix = "msg-";

        final Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "100"); // avoid all messages being in the same batch

        @Cleanup
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);

        Map<Integer, Long> indexToOffset = new ConcurrentHashMap<>();
        final CountDownLatch latch = new CountDownLatch(numMessages);
        for (int i = 0; i < numMessages; i++) {
            final int index = i;
            producer.send(new ProducerRecord<>(topic, i, messagePrefix + i), (recordMetadata, e) -> {
                if (e != null) {
                    log.error("Failed to send {}: {}", index, e);
                    fail("Failed to send " + index + ": " + e.getMessage());
                }
                assertEquals(recordMetadata.topic(), topic);
                assertEquals(recordMetadata.partition(), 0);
                indexToOffset.put(index, recordMetadata.offset());
                latch.countDown();
            }); // asynchronous send
        }
        latch.await();
        indexToOffset.forEach((index, offset) -> {
            log.info("{} => {} (delta = {})", index, offset, offset - index);
        });

The output:

1 => 10 (delta = 9)
2 => 9 (delta = 7)
3 => 10 (delta = 7)
4 => 9 (delta = 5)
5 => 10 (delta = 5)
6 => 9 (delta = 3)
7 => 10 (delta = 3)
8 => 9 (delta = 1)
9 => 10 (delta = 1)
10 => 19 (delta = 9)
11 => 20 (delta = 9)
12 => 19 (delta = 7)
13 => 20 (delta = 7)
14 => 19 (delta = 5)
15 => 20 (delta = 5)
16 => 19 (delta = 3)
17 => 20 (delta = 3)
18 => 19 (delta = 1)
19 => 20 (delta = 1)

We can see the offset is not continuous, even not monodic increasing. And the first offset is not zero.

However, if we use synchronous send, i.e. producer.send(/* ... */).get(), the first offset will be 0, and the offset will be continuous.

Expected behavior
The offset in producer's send callback is continuous.

@jiazhai
Copy link
Contributor

jiazhai commented Jan 19, 2021

this is a little strange, how many entries(batched messages) are there in bk? it would be helpful to get the offset metadata from entry.

@BewareMyPower
Copy link
Collaborator Author

It looks like there are two bugs:

  1. The design of PendingProduceQueue has some problems, when I remove the queue and send synchronously, the offset will be continuous.
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
index 57ca22e..c7f63f8 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
@@ -48,6 +48,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -602,10 +603,22 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
             String fullPartitionName = KopTopic.toString(topicPartition);
             PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName,
                     entryFormatter, records, executor);
+            CountDownLatch latch = new CountDownLatch(1);
+            pendingProduce.whenComplete(() -> {
+                pendingProduce.publishMessages();
+                latch.countDown();
+            });
+            try {
+                latch.await();
+            } catch (InterruptedException ignored) {
+                // TODO:
+            }
+            /*
             PendingProduceQueue queue =
                     pendingProduceQueueMap.computeIfAbsent(topicPartition, ignored -> new PendingProduceQueue());
             queue.add(pendingProduce);
             pendingProduce.whenComplete(queue::sendCompletedProduces);
+             */
         }
 
         CompletableFuture.allOf(responsesFutures.values().toArray(new CompletableFuture<?>[responsesSize]))
  1. However, the base offset is still wrong. And I found that the first batch's base offset is N-1, where N is the number of messages in the batch.

@BewareMyPower
Copy link
Collaborator Author

BewareMyPower commented Jan 19, 2021

For what I mentioned before, the second bug is easy to fix, but the first bug needs some refactor because it's caused by race condition.

Let

  • Persist[i] be the time when message i was persisted;
  • Complete[i] be the time when callback (MessagePublishContext#completed) was invoked.

Assuming there're two messages/records 0 and 1, each record has only 1 message. Then we have

  • Persist[0] happens before Persist[1], which guarantees the messaging order.
  • Persist[0] happens before Complete[0].
  • Persist[1] happens before Complete[1].

So there're two possible timelines (current offset is the offset of latest message, it's LEO - 1 in Kafka):

Event interceptor's index (current offset) base offset
Persist[0] 0
Persist[1] 1
Complete[0] 1 1
Complete[1] 1 1

or

Event interceptor's index (current offset) base offset
Persist[0] 0
Complete[0] 0 0
Persist[1] 1
Complete[1] 1 1

We can see for the first case, the callback of message 0 retrieves the wrong current offset which belongs to message 1, because the interceptor has already updated twice.

jiazhai pushed a commit that referenced this issue Jan 20, 2021
This PR is a partial fix of #332.

Before this PR, the MessagePublishContext completed with the current offset, which is the latest offset. Then PartitionResponse will be filled with the offset. However, Kafka producer treat the PartitionResponse's offset as the base offset.

For example, before this PR, when Kafka producer sends a batch with 3 single messages to a new topic, the offsets in send callback will be (2, 3, 4) before this PR. After this PR, the offsets in send callback will be (0, 1, 2).
@BewareMyPower
Copy link
Collaborator Author

Another fix from pulsar side is WIP, see apache/pulsar#9257

@BewareMyPower
Copy link
Collaborator Author

BewareMyPower commented Jan 24, 2021

The fix of Kafka client's send callback needs:

@BewareMyPower
Copy link
Collaborator Author

The KafkaRequestHandlerTest#testProduceCallback is flaky because this task is not completed but was closed accidentally before.

INFO  io.streamnative.pulsar.handlers.kop.KafkaRequestHandlerTest - Actual offsets: [5, 1, 2, 3, 4, 5, 6, 7, 8, 9]

From the error log, we can see the first message's offset is 5. It's caused by the race condition I mentioned before.

BewareMyPower added a commit that referenced this issue Apr 19, 2021
Fixes #332 

The offset in produce callback may be not accurate when the messages are sent in batch, see #332 (comment) for detail explanation.

Since apache/pulsar#9257 introduced a new API that supports fetching some metadata from the entry data, we can use this API to get the accurate offset.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
3 participants