Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Compute the backlog messages stat from the mark-delete position
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/platform/cloud_messaging/trunk@2657 3b44db21-3a6d-4d5f-afd4-25f0118656d7
  • Loading branch information
mmerli committed Jun 27, 2013
1 parent 6b5d64c commit 904591d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 11 deletions.
11 changes: 11 additions & 0 deletions src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
Expand Up @@ -78,6 +78,17 @@ public interface ManagedCursor {
*/
public long getNumberOfEntries();

/**
* Return the number of non-deleted messages on this cursor.
*
* This will also include messages that have already been read from the cursor but not deleted or mark-deleted yet.
*
* This method has linear time complexity on the number of ledgers included in the managed ledger.
*
* @return the number of entries
*/
public long getNumberOfEntriesInBacklog();

/**
* This signals that the reader is done with all the entries up to "position" (included). This can potentially
* trigger a ledger deletion, if all the other cursors are done too with the underlying ledger.
Expand Down
Expand Up @@ -249,7 +249,16 @@ public boolean hasMoreEntries() {

@Override
public long getNumberOfEntries() {
PositionImpl fromPosition = readPosition.get();
return getNumberOfEntries(readPosition.get());
}

@Override
public long getNumberOfEntriesInBacklog() {
PositionImpl ackPosition = acknowledgedPosition.get();
return getNumberOfEntries(new PositionImpl(ackPosition.getLedgerId(), ackPosition.getEntryId() + 1));
}

private long getNumberOfEntries(PositionImpl fromPosition) {
long allEntries = ledger.getNumberOfEntries(fromPosition);
Range<PositionImpl> accountedEntriesRange = Range.atLeast(fromPosition);

Expand Down
Expand Up @@ -191,7 +191,7 @@ public long getNumberOfMessagesInBacklog() {
long count = 0;

for (ManagedCursor cursor : managedLedger.getCursors()) {
count += cursor.getNumberOfEntries();
count += cursor.getNumberOfEntriesInBacklog();
}

return count;
Expand Down
Expand Up @@ -64,6 +64,11 @@ public long getNumberOfEntries() {
return 0;
}

@Override
public long getNumberOfEntriesInBacklog() {
return 0;
}

@Override
public void markDelete(Position position) throws ManagedLedgerException {
this.position = position;
Expand Down
Expand Up @@ -1006,24 +1006,29 @@ void testFilteringReadEntries() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
ManagedCursor cursor = ledger.openCursor("c1");

Position p1 = ledger.addEntry("entry1".getBytes());
Position p2 = ledger.addEntry("entry2".getBytes());
Position p3 = ledger.addEntry("entry3".getBytes());
Position p4 = ledger.addEntry("entry4".getBytes());
/* Position p1 = */ledger.addEntry("entry1".getBytes());
/* Position p2 = */ledger.addEntry("entry2".getBytes());
/* Position p3 = */ledger.addEntry("entry3".getBytes());
/* Position p4 = */ledger.addEntry("entry4".getBytes());
Position p5 = ledger.addEntry("entry5".getBytes());
Position p6 = ledger.addEntry("entry6".getBytes());
/* Position p6 = */ledger.addEntry("entry6".getBytes());

assertEquals(cursor.getNumberOfEntries(), 6);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 6);

assertEquals(cursor.readEntries(3).size(), 3);

assertEquals(cursor.getNumberOfEntries(), 3);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 6);

log.info("Deleting {}", p5);
cursor.delete(p5);

assertEquals(cursor.getNumberOfEntries(), 2);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 5);
assertEquals(cursor.readEntries(3).size(), 2);
assertEquals(cursor.getNumberOfEntries(), 0);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 5);
}

@Test(timeOut = 20000)
Expand All @@ -1033,15 +1038,16 @@ void testCountingWithDeletedEntries() throws Exception {
ManagedCursor cursor = ledger.openCursor("c1");

Position p1 = ledger.addEntry("entry1".getBytes());
Position p2 = ledger.addEntry("entry2".getBytes());
Position p3 = ledger.addEntry("entry3".getBytes());
Position p4 = ledger.addEntry("entry4".getBytes());
/* Position p2 = */ledger.addEntry("entry2".getBytes());
/* Position p3 = */ledger.addEntry("entry3".getBytes());
/* Position p4 = */ledger.addEntry("entry4".getBytes());
Position p5 = ledger.addEntry("entry5".getBytes());
Position p6 = ledger.addEntry("entry6".getBytes());
Position p7 = ledger.addEntry("entry7".getBytes());
Position p8 = ledger.addEntry("entry8".getBytes());

assertEquals(cursor.getNumberOfEntries(), 8);

cursor.delete(p8);
assertEquals(cursor.getNumberOfEntries(), 7);

Expand Down
Expand Up @@ -106,7 +106,7 @@ public void simple() throws Exception {
assertTrue(mbean.getReadEntriesRate() > 0.0);
assertEquals(mbean.getReadEntriesSucceeded(), 1);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertEquals(mbean.getNumberOfMessagesInBacklog(), 0);
assertEquals(mbean.getNumberOfMessagesInBacklog(), 2);
}

}

0 comments on commit 904591d

Please sign in to comment.