From 5b65fda7aa32ca14d0ab2e2a657ea97870332196 Mon Sep 17 00:00:00 2001
From: congbo <39078850+congbobo184@users.noreply.github.com>
Date: Thu, 4 Aug 2022 15:04:54 +0800
Subject: [PATCH] [improve][txn] change delete pending ack position from
foreach to firstKey (#16927)
---
.../pendingack/impl/PendingAckHandleImpl.java | 12 +++--
.../PendingAckInMemoryDeleteTest.java | 54 ++++++++++++++++++-
2 files changed, 60 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index c74fb3e921775..b3aec6c67f54f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -95,7 +95,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
*
* If it does not exits the map, the position will be added to the map.
*/
- private Map> individualAckPositions;
+ private ConcurrentSkipListMap> individualAckPositions;
/**
* The map is for transaction with position witch was cumulative acked by this transaction.
@@ -884,12 +884,14 @@ public synchronized void clearIndividualPosition(Position position) {
individualAckPositions.remove(position);
}
- individualAckPositions.forEach((persistentPosition, positionIntegerMutablePair) -> {
- if (persistentPosition.compareTo((PositionImpl) persistentSubscription
+ while (individualAckPositions.firstEntry() != null) {
+ if (individualAckPositions.firstKey().compareTo((PositionImpl) persistentSubscription
.getCursor().getMarkDeletedPosition()) < 0) {
- individualAckPositions.remove(persistentPosition);
+ individualAckPositions.remove(individualAckPositions.firstKey());
+ } else {
+ break;
}
- });
+ }
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index da2a3a940bd2c..c35d15d96da05 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -21,10 +21,10 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -40,6 +40,7 @@
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -47,6 +48,7 @@
import java.lang.reflect.Field;
import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -280,6 +282,56 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception {
}
}
+ @Test
+ public void testPendingAckClearPositionIsSmallerThanMarkDelete() throws Exception {
+ String normalTopic = NAMESPACE1 + "/testPendingAckClearPositionIsSmallerThanMarkDelete";
+ String subscriptionName = "test";
+
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer()
+ .topic(normalTopic)
+ .subscriptionName(subscriptionName)
+ .enableBatchIndexAcknowledgment(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer producer = pulsarClient.newProducer()
+ .topic(normalTopic)
+ .enableBatching(true)
+ .batchingMaxMessages(200)
+ .create();
+
+ // mark delete position
+ producer.send("test1".getBytes());
+
+ Transaction commitTxn = getTxn();
+
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), commitTxn).get();
+
+ PendingAckHandle pendingAckHandle = Whitebox.getInternalState(getPulsarServiceList().get(0)
+ .getBrokerService().getTopic("persistent://" + normalTopic, false).get().get()
+ .getSubscription(subscriptionName), "pendingAckHandle");
+
+ Map> individualAckPositions =
+ Whitebox.getInternalState(pendingAckHandle, "individualAckPositions");
+ // one message in pending ack state
+ assertEquals(1, individualAckPositions.size());
+
+ // put the PositionImpl.EARLIEST to the map
+ individualAckPositions.put(PositionImpl.EARLIEST, new MutablePair<>(PositionImpl.EARLIEST, 0));
+
+ // put the PositionImpl.LATEST to the map
+ individualAckPositions.put(PositionImpl.LATEST, new MutablePair<>(PositionImpl.EARLIEST, 0));
+
+ // three position in pending ack state
+ assertEquals(3, individualAckPositions.size());
+
+ // commit this txn will delete the received position and PositionImpl.EARLIEST, don't delete PositionImpl.LATEST
+ commitTxn.commit().get();
+ assertEquals(1, individualAckPositions.size());
+ }
+
private Transaction getTxn() throws Exception {
return pulsarClient
.newTransaction()