Skip to content

Commit

Permalink
[BUGFIX] cannot cleanup expired data after managed-ledger restart (ap…
Browse files Browse the repository at this point in the history
…ache#10087)

This pull request resolves apache#10086 

## Motivation

Although apache#7111 and apache#9136 solved the problems of 
1. current ledger is full and cannot be rolled
2. cursor has subscribed to an expired ledger, which makes the cleaning thread unable to clean up

respectively. However, when we close and open a managed-ledger(such like brokers shutdown unexpectedly or topics unload due to rebalance), the managed-ledger will create an empty ledger after initialization causing the current cleanup logic failed to take effect. Therefore I think we need a more unified entrance to solve the cursor update problem, and then further solve the problem of clearing expired ledgers.

## Expected Behavior

1. Able to cleanup expired data after managed-ledger re-open
2. A more unified entrance to implement cursor update

## Modification

1. move the cursor update logic to `internalTrimConsumedLedgers` from `ManagedLedgerImpl`'s callback `createComplete`
2. update `lastConfirmedEntry` if necessary when updating cursors
  • Loading branch information
wuzhanpeng authored and yangl committed Jun 23, 2021
1 parent 5d5767a commit 6dfc478
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1654,13 +1654,28 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}

if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
boolean shouldCursorMoveForward = false;
try {
long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
shouldCursorMoveForward = (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
&& (newPosition.getLedgerId() == nextValidLedger);
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
}

if (shouldCursorMoveForward) {
log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
ledger.getName(), markDeletePosition, newPosition);
} else {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
}

lock.writeLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1397,8 +1397,6 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
final long previousEntries = currentLedgerEntries;
final long previousLedgerId = currentLedger.getId();
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;
Expand All @@ -1416,14 +1414,9 @@ public void operationComplete(Void v, Stat stat) {
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
}
// Move cursor read point to new ledger
for (ManagedCursor cursor : cursors) {
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
if (markDeletedPosition.getLedgerId() == previousLedgerId && markDeletedPosition.getEntryId() + 1 >= previousEntries) {
// All entries in last ledger are marked delete, move read point to the new ledger
updateCursor((ManagedCursorImpl) cursor, PositionImpl.get(currentLedger.getId(), -1));
}
}

// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}

@Override
Expand Down Expand Up @@ -2167,6 +2160,39 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
this.waitingEntryCallBacks.add(cb);
}

public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
for (ManagedCursor cursor : cursors) {
PositionImpl lastAckedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);

if (currPointedLedger != null) {
if (nextPointedLedger != null) {
if (lastAckedPosition.getEntryId() != -1 &&
lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
lastAckedPosition = new PositionImpl(nextPointedLedger.getLedgerId(), -1);
}
} else {
log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor);
}
} else {
log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
}

if (!lastAckedPosition.equals((PositionImpl) cursor.getMarkDeletedPosition())) {
try {
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
updateCursor((ManagedCursorImpl) cursor, lastAckedPosition);
} catch (Exception e) {
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
cursor, cursor.getMarkDeletedPosition(), lastAckedPosition);
log.warn("Caused by", e);
}
}
}
}

private void trimConsumedLedgersInBackground() {
trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void testPropertiesClose() throws Exception {
ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());

Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
Expand Down Expand Up @@ -81,6 +82,7 @@ void testPropertiesRecoveryAfterCrash() throws Exception {
ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());

Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
Expand Down Expand Up @@ -111,6 +113,7 @@ void testPropertiesOnDelete() throws Exception {
ledger.addEntry("entry-1".getBytes());
Position p2 = ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());

Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -2920,6 +2921,48 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
Assert.assertEquals(ledger.getTotalSize(), 0);
}

@Test
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaxEntriesPerLedger(2);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(500, TimeUnit.MILLISECONDS);

ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("ml_restart_ledger", config);
ManagedCursor cursor = managedLedger.openCursor("c1");

for (int i = 0; i < 3; i++) {
managedLedger.addEntry(new byte[1024 * 1024]);
}

// we have 2 ledgers at the beginning [{entries=2}, {entries=1}]
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
List<Entry> entries = cursor.readEntries(3);

for (Entry entry : entries) {
cursor.markDelete(entry.getPosition());
}
entries.forEach(e -> e.release());

// managed-ledger restart
managedLedger.close();
managedLedger = (ManagedLedgerImpl) factory.open("ml_restart_ledger", config);

// then we have one more empty ledger after managed-ledger initialization
// and now ledgers are [{entries=2}, {entries=1}, {entries=0}]
Assert.assertTrue(managedLedger.getLedgersInfoAsList().size() >= 2);

// Now we update the cursors that are still subscribing to ledgers that has been consumed completely
managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
managedLedger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
Thread.sleep(100);

// We only have one empty ledger at last [{entries=0}]
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
Assert.assertEquals(managedLedger.getTotalSize(), 0);
}

@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down

0 comments on commit 6dfc478

Please sign in to comment.