Skip to content

Commit

Permalink
Reproduce out-of-order delivery issue in PR 12456
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Oct 23, 2021
1 parent 9ee6656 commit 1b0ad24
Showing 1 changed file with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.client.api;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -24,16 +25,24 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -70,6 +79,7 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws
// method calls on the interface.
Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService)));
}

@Override
public ExecutorService getInternalExecutorService() {
return internalExecutorServiceDelegate;
Expand Down Expand Up @@ -119,4 +129,69 @@ public void testMultiTopicsConsumerCloses() throws Exception {
verify(internalExecutorServiceDelegate, times(0))
.schedule(any(Runnable.class), anyLong(), any());
}

// test that reproduces the issue that PR https://github.com/apache/pulsar/pull/12456 fixes
// where MultiTopicsConsumerImpl has a data race that causes out-of-order delivery of messages
@Test
public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException,
TimeoutException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

Producer<Long>[] producers = new Producer[numPartitions];

for (int i = 0; i < numPartitions; i++) {
producers[i] = pulsarClient.newProducer(Schema.INT64)
// produce to each partition directly so that order can be maintained in sending
.topic(topicName + "-partition-" + i)
.enableBatching(true)
.maxPendingMessages(30000)
.maxPendingMessagesAcrossPartitions(60000)
.batchingMaxMessages(10000)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxBytes(4 * 1024 * 1024)
.blockIfQueueFull(true)
.create();
}

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
// consume on the partitioned topic
.topic(topicName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(numMessages)
.subscriptionName(methodName)
.subscribe();

// produce sequence numbers to each partition topic
long sequenceNumber = 1L;
for (int i = 0; i < numMessages; i++) {
for (Producer<Long> producer : producers) {
producer.newMessage()
.value(sequenceNumber)
.sendAsync();
}
sequenceNumber++;
}
for (Producer<Long> producer : producers) {
producer.close();
}

// receive and validate sequences in the partitioned topic
Map<String, AtomicLong> receivedSequences = new HashMap<>();
int receivedCount = 0;
while (receivedCount < numPartitions * numMessages) {
Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
consumer.acknowledge(message);
receivedCount++;
AtomicLong receivedSequenceCounter =
receivedSequences.computeIfAbsent(message.getTopicName(), k -> new AtomicLong(1L));
Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}
}

0 comments on commit 1b0ad24

Please sign in to comment.