Skip to content

Commit 93e15d9

Browse files
Technoboy-codelipenghui
authored andcommitted
Fix lost message issue due to ledger rollover. (apache#14664)
(cherry picked from commit ad2cc2d)
1 parent c8a0dff commit 93e15d9

File tree

5 files changed

+48
-10
lines changed

5 files changed

+48
-10
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -774,8 +774,8 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
774774
}
775775
} else if (state == State.ClosedLedger) {
776776
// No ledger and no pending operations. Create a new ledger
777-
log.info("[{}] Creating a new ledger", name);
778777
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
778+
log.info("[{}] Creating a new ledger", name);
779779
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
780780
mbean.startDataLedgerCreateOp();
781781
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
@@ -1644,7 +1644,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
16441644

16451645
synchronized void createLedgerAfterClosed() {
16461646
if (isNeededCreateNewLedgerAfterCloseLedger()) {
1647-
log.info("[{}] Creating a new ledger", name);
1647+
log.info("[{}] Creating a new ledger after closed", name);
16481648
STATE_UPDATER.set(this, State.CreatingLedger);
16491649
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
16501650
mbean.startDataLedgerCreateOp();
@@ -1667,8 +1667,8 @@ boolean isNeededCreateNewLedgerAfterCloseLedger() {
16671667
@Override
16681668
public void rollCurrentLedgerIfFull() {
16691669
log.info("[{}] Start checking if current ledger is full", name);
1670-
if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
1671-
STATE_UPDATER.set(this, State.ClosingLedger);
1670+
if (currentLedgerEntries > 0 && currentLedgerIsFull()
1671+
&& STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
16721672
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
16731673
@Override
16741674
public void closeComplete(int rc, LedgerHandle lh, Object o) {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2240,6 +2240,9 @@ void testFindNewestMatchingAfterLedgerRollover() throws Exception {
22402240
// roll a new ledger
22412241
int numLedgersBefore = ledger.getLedgersInfo().size();
22422242
ledger.getConfig().setMaxEntriesPerLedger(1);
2243+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
2244+
stateUpdater.setAccessible(true);
2245+
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
22432246
ledger.rollCurrentLedgerIfFull();
22442247
Awaitility.await().atMost(20, TimeUnit.SECONDS)
22452248
.until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1969,6 +1969,9 @@ public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
19691969
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
19701970
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
19711971
// let current ledger close
1972+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
1973+
stateUpdater.setAccessible(true);
1974+
stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
19721975
ml.rollCurrentLedgerIfFull();
19731976
// let retention expire
19741977
Thread.sleep(1500);
@@ -2238,6 +2241,9 @@ public void testGetPositionAfterN() throws Exception {
22382241
managedCursor.markDelete(positionMarkDelete);
22392242

22402243
//trigger ledger rollover and wait for the new ledger created
2244+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
2245+
stateUpdater.setAccessible(true);
2246+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
22412247
managedLedger.rollCurrentLedgerIfFull();
22422248
Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
22432249
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
@@ -3096,7 +3102,7 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
30963102
ledger.addEntry(new byte[1024 * 1024]);
30973103
}
30983104

3099-
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
3105+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
31003106
List<Entry> entries = cursor.readEntries(msgNum);
31013107
Assert.assertEquals(msgNum, entries.size());
31023108

@@ -3107,9 +3113,12 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
31073113

31083114
// all the messages have benn acknowledged
31093115
// and all the ledgers have been removed except the last ledger
3110-
Thread.sleep(1000);
3111-
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
3112-
Assert.assertEquals(ledger.getTotalSize(), 0);
3116+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
3117+
stateUpdater.setAccessible(true);
3118+
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
3119+
ledger.rollCurrentLedgerIfFull();
3120+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
3121+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
31133122
}
31143123

31153124
@Test
@@ -3127,6 +3136,26 @@ public void testLedgerReachMaximumRolloverTime() throws Exception {
31273136
.until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId());
31283137
}
31293138

3139+
@Test
3140+
public void testLedgerNotRolloverWithoutOpenState() throws Exception {
3141+
ManagedLedgerConfig config = new ManagedLedgerConfig();
3142+
config.setMaxEntriesPerLedger(2);
3143+
3144+
ManagedLedgerImpl ml = spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", config));
3145+
ml.addEntry("test1".getBytes()).getLedgerId();
3146+
long ledgerId2 = ml.addEntry("test2".getBytes()).getLedgerId();
3147+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
3148+
stateUpdater.setAccessible(true);
3149+
// Set state to CreatingLedger to avoid rollover
3150+
stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
3151+
ml.rollCurrentLedgerIfFull();
3152+
Field currentLedger = ManagedLedgerImpl.class.getDeclaredField("currentLedger");
3153+
currentLedger.setAccessible(true);
3154+
LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
3155+
Awaitility.await()
3156+
.until(() -> ledgerId2 == lh.getId());
3157+
}
3158+
31303159
@Test
31313160
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
31323161
ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -3488,5 +3517,4 @@ public void testOffloadTaskCancelled() throws Exception {
34883517
Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
34893518
});
34903519
}
3491-
34923520
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import java.lang.reflect.Field;
2122
import java.time.Duration;
2223
import java.util.concurrent.TimeUnit;
2324
import lombok.Cleanup;
@@ -98,6 +99,9 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
9899
});
99100

100101
// trigger a ledger rollover
102+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
103+
stateUpdater.setAccessible(true);
104+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
101105
managedLedger.rollCurrentLedgerIfFull();
102106

103107
// the last ledger will be closed and removed and we have one ledger for empty

pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,11 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E
164164
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
165165
Position position = managedLedger.getLastConfirmedEntry();
166166
if (isUseManagedLedgerProperties) {
167+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
168+
stateUpdater.setAccessible(true);
169+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
170+
managedLedger.rollCurrentLedgerIfFull();
167171
Awaitility.await().until(() -> {
168-
managedLedger.rollCurrentLedgerIfFull();
169172
return !managedLedger.ledgerExists(position.getLedgerId());
170173
});
171174
}

0 commit comments

Comments
 (0)