diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index d51b48bdda526..e011bf3e6d73e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2850,15 +2850,14 @@ void advanceCursorsIfNecessary(List ledgersToDelete) throws LedgerNo return; } - // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion - // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return - // incorrect results - Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId()); - if (firstNonDeletedLedger == null) { - throw new LedgerNotExistException("First non deleted Ledger is not found"); + // Just ack messages like a consumer. Normally, consumers will not confirm a position that does not exist, so + // find the latest existing position to ack. + PositionImpl highestPositionToDelete = calculateLastEntryInLedgerList(ledgersToDelete); + if (highestPositionToDelete == null) { + log.warn("[{}] The ledgers to be trim are all empty, skip to advance non-durable cursors: {}", + name, ledgersToDelete); + return; } - PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1); - cursors.forEach(cursor -> { // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be @@ -2882,6 +2881,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { }); } + /** + * @return null if all ledgers is empty. + */ + private PositionImpl calculateLastEntryInLedgerList(List ledgersToDelete) { + for (int i = ledgersToDelete.size() - 1; i >= 0; i--) { + LedgerInfo ledgerInfo = ledgersToDelete.get(i); + if (ledgerInfo != null && ledgerInfo.hasEntries() && ledgerInfo.getEntries() > 0) { + return PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); + } + } + return null; + } + /** * Delete this ManagedLedger completely from the system. * diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index b5d00ac012add..20407295ccb0e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -55,7 +56,7 @@ @Test(groups = "broker-api") @Slf4j -public class NonDurableSubscriptionTest extends ProducerConsumerBase { +public class NonDurableSubscriptionTest extends ProducerConsumerBase { private final AtomicInteger numFlow = new AtomicInteger(0); @@ -316,7 +317,7 @@ private void switchLedgerManually(final String tpName) throws Exception { } @Test - public void testTrimLedgerIfNoDurableCursor() throws Exception { + public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception { final String nonDurableCursor = "non-durable-cursor"; final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); Reader reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1) @@ -557,4 +558,114 @@ public void testReaderInitAtDeletedPosition() throws Exception { producer.close(); admin.topics().delete(topicName, false); } + + @Test + public void testTrimLedgerIfNoDurableCursor() throws Exception { + final String nonDurableCursor = "non-durable-cursor"; + final String durableCursor = "durable-cursor"; + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topicName); + Reader reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1) + .subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1) + .subscriptionName(durableCursor).subscribe(); + consumer.close(); + + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + producer.send("1"); + producer.send("2"); + producer.send("3"); + producer.send("4"); + MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5"); + + Message msg1 = reader.readNext(2, TimeUnit.SECONDS); + assertEquals(msg1.getValue(), "1"); + Message msg2 = reader.readNext(2, TimeUnit.SECONDS); + assertEquals(msg2.getValue(), "2"); + Message msg3 = reader.readNext(2, TimeUnit.SECONDS); + assertEquals(msg3.getValue(), "3"); + + // Unsubscribe durable cursor. + // Trigger a trim ledgers task, and verify trim ledgers successful. + admin.topics().unload(topicName); + Thread.sleep(3 * 1000); + admin.topics().deleteSubscription(topicName, durableCursor); + // Trim ledgers after release durable cursor. + trimLedgers(topicName); + List ledgers = admin.topics().getInternalStats(topicName).ledgers; + assertEquals(ledgers.size(), 1); + assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId()); + + // Verify backlog and markDeletePosition is correct. + Awaitility.await().untilAsserted(() -> { + SubscriptionStats subscriptionStats = admin.topics().getStats(topicName, true, true, true) + .getSubscriptions().get(nonDurableCursor); + log.info("backlog size: {}", subscriptionStats.getMsgBacklog()); + assertEquals(subscriptionStats.getMsgBacklog(), 0); + ManagedLedgerInternalStats.CursorStats cursorStats = + admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor); + String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":"); + PositionImpl actMarkDeletedPos = + PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); + PositionImpl expectedMarkDeletedPos = + PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); + log.info("Expected mark deleted position: {}", expectedMarkDeletedPos); + log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition); + Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); + }); + + // Clear the incoming queue of the reader for next test. + while (true) { + Message msg = reader.readNext(2, TimeUnit.SECONDS); + if (msg == null) { + break; + } + log.info("clear msg: {}", msg.getValue()); + } + + // The following tests are designed to verify the api "getNumberOfEntries" and "consumedEntries" still work + // after changes.See the code-description added with the PR https://github.com/apache/pulsar/pull/10667. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(nonDurableCursor); + + // Verify "getNumberOfEntries" if there is no entries to consume. + assertEquals(0, cursor.getNumberOfEntries()); + assertEquals(0, ml.getNumberOfEntries()); + + // Verify "getNumberOfEntries" if there is 1 entry to consume. + producer.send("6"); + producer.send("7"); + Awaitility.await().untilAsserted(() -> { + assertEquals(2, ml.getNumberOfEntries()); + // Since there is one message has been pulled into the incoming queue of reader. There is only one messages + // waiting to cursor read. + assertEquals(1, cursor.getNumberOfEntries()); + }); + + // Verify "consumedEntries" is correct. + ManagedLedgerInternalStats.CursorStats cursorStats = + admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor); + // "messagesConsumedCounter" should be 0 after unload the topic. + // Note: "topic_internal_stat.cursor.messagesConsumedCounter" means how many messages were acked on this + // cursor. The similar one "topic_stats.lastConsumedTimestamp" means the last time of sending messages to + // the consumer. + assertEquals(0, cursorStats.messagesConsumedCounter); + Message msg6 = reader.readNext(2, TimeUnit.SECONDS); + assertEquals(msg6.getValue(), "6"); + Message msg7 = reader.readNext(2, TimeUnit.SECONDS); + assertEquals(msg7.getValue(), "7"); + Awaitility.await().untilAsserted(() -> { + // "messagesConsumedCounter" should be 2 after consumed 2 message. + ManagedLedgerInternalStats.CursorStats cStat = + admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor); + assertEquals(2, cStat.messagesConsumedCounter); + }); + + // cleanup. + reader.close(); + producer.close(); + admin.topics().delete(topicName, false); + } }