Browse files

Improved code coverage

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/platform/yxms/trunk/managed-ledger@333 b48032ab-c6b0-43d1-8dc6-875f9f7cdcf0
  • Loading branch information...
1 parent 60cafa4 commit 71b9c185ce5df9dd28260ea4e05ddfc22419ab4c mmerli committed Jul 12, 2012
View
4 src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -138,7 +138,7 @@ public PositionImpl getSlowestReaderPosition() {
* the node to push
*/
private void pushTowardHead(Node node) {
- while (node != null && node.previous != null) {
+ while (node.previous != null) {
// While this node is "bigger" than its previous, swap the two.
long currentId = ((PositionImpl) node.data.getMarkDeletedPosition()).getLedgerId();
long previousId = ((PositionImpl) node.previous.data.getMarkDeletedPosition()).getLedgerId();
@@ -165,7 +165,7 @@ private void pushTowardHead(Node node) {
* the node to push
*/
private void pushTowardTail(Node node) {
- while (node != null && node.next != null) {
+ while (node.next != null) {
// While this node is "bigger" than its previous, swap the two.
long current = ((PositionImpl) node.data.getMarkDeletedPosition()).getLedgerId();
long next = ((PositionImpl) node.next.data.getMarkDeletedPosition()).getLedgerId();
View
7 src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -48,8 +48,7 @@
private final AtomicReference<PositionImpl> acknowledgedPosition = new AtomicReference<PositionImpl>();
private final AtomicReference<PositionImpl> readPosition = new AtomicReference<PositionImpl>();
- ManagedCursorImpl(ManagedLedgerImpl ledger, String name, PositionImpl position) throws InterruptedException,
- ManagedLedgerException {
+ ManagedCursorImpl(ManagedLedgerImpl ledger, String name, PositionImpl position) {
this.ledger = ledger;
this.name = name;
this.acknowledgedPosition.set(position);
@@ -157,8 +156,8 @@ public void run() {
@Override
public synchronized String toString() {
- return Objects.toStringHelper(this).add("name", name).add("ackPos", acknowledgedPosition)
- .add("readPos", readPosition).toString();
+ return Objects.toStringHelper(this).add("ledger", ledger.getName()).add("name", name)
+ .add("ackPos", acknowledgedPosition).add("readPos", readPosition).toString();
}
@Override
View
59 src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -90,6 +90,7 @@
ClosedLedger, // Current ledger has been closed and there's no pending
// operation
CreatingLedger, // Creating a new ledger
+ Closed, // ManagedLedger has been closed
Fenced, // A managed ledger is fenced when there is some concurrent
// access from a different session/machine. In this state the
// managed ledger will throw exception for all operations, since
@@ -221,19 +222,9 @@ private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callb
store.getConsumers(name, new MetaStoreCallback<List<Pair<String, Position>>>() {
public void operationComplete(List<Pair<String, Position>> result, Version v) {
// Load existing cursors
- try {
- for (Pair<String, Position> pair : result) {
- log.debug("[{}] Loading cursor {}", name, pair);
- cursors.add(new ManagedCursorImpl(ManagedLedgerImpl.this, pair.first,
- (PositionImpl) pair.second));
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- callback.initializeFailed(new ManagedLedgerException(e));
- return;
- } catch (ManagedLedgerException e) {
- callback.initializeFailed(e);
- return;
+ for (Pair<String, Position> pair : result) {
+ log.debug("[{}] Loading cursor {}", name, pair);
+ cursors.add(new ManagedCursorImpl(ManagedLedgerImpl.this, pair.first, (PositionImpl) pair.second));
}
// Calculate total entries and size
@@ -288,11 +279,13 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
@Override
public synchronized void asyncAddEntry(final byte[] data, final AddEntryCallback callback, final Object ctx) {
- checkArgument(state != State.None);
log.debug("[{}] asyncAddEntry size={} state={}", va(name, data.length, state));
if (state == State.Fenced) {
callback.addFailed(new ManagedLedgerFencedException(), ctx);
return;
+ } else if (state == State.Closed) {
+ callback.addFailed(new ManagedLedgerException("Managed ledger was already closed"), ctx);
+ return;
}
OpAddEntry addOperation = new OpAddEntry(this, data, callback, ctx);
@@ -331,6 +324,7 @@ public synchronized void asyncAddEntry(final byte[] data, final AddEntryCallback
@Override
public synchronized ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
+ checkManagedLedgerIsOpen();
checkFenced();
ManagedCursor cursor = cursors.get(cursorName);
@@ -392,6 +386,7 @@ public synchronized void close() throws InterruptedException, ManagedLedgerExcep
ledgerCache.invalidateAll();
log.info("Invalidated {} ledgers in cache", ledgerCache.size());
factory.close(this);
+ state = State.Closed;
}
@Override
@@ -502,18 +497,6 @@ synchronized void asyncReadEntries(OpReadEntry opReadEntry) {
LedgerHandle ledger = null;
- if (opReadEntry.readPosition.getLedgerId() == -1) {
- if (ledgers.isEmpty()) {
- // The ManagedLedger is completely empty
- opReadEntry.emptyResponse();
- return;
- }
-
- // Initialize the position on the first entry for the first ledger
- // in the set
- opReadEntry.readPosition = new PositionImpl(ledgers.firstKey(), 0);
- }
-
long id = opReadEntry.readPosition.getLedgerId();
if (id == currentLedger.getId()) {
@@ -610,6 +593,7 @@ public synchronized void openComplete(int rc, LedgerHandle ledger, Object ctx) {
}
log.debug("[{}] Successfully opened ledger {} for reading", name, ledger.getId());
+ ledgerCache.put(ledger.getId(), ledger);
internalReadFromLedger(ledger, opReadEntry);
}
@@ -662,6 +646,7 @@ synchronized void updateCursor(ManagedCursorImpl cursor, PositionImpl newPositio
void trimConsumedLedgersInBackground() {
executor.execute(new Runnable() {
public void run() {
+ // Ensure only one trimming operation is active
synchronized (trimmerMutex) {
internalTrimConsumedLedgers();
}
@@ -676,7 +661,7 @@ public void run() {
* @throws Exception
*/
void internalTrimConsumedLedgers() {
- // Ensure only one trimming operation is active
+
List<LedgerStat> ledgersToDelete = Lists.newArrayList();
synchronized (this) {
@@ -777,8 +762,7 @@ synchronized long getNumberOfEntries(PositionImpl position) {
long count = 0;
// First count the number of unread entries in the ledger pointed by
// position
- if (position.getLedgerId() >= 0)
- count += ledgers.get(position.getLedgerId()).getEntriesCount() - position.getEntryId();
+ count += ledgers.get(position.getLedgerId()).getEntriesCount() - position.getEntryId();
// Then, recur all the next ledgers and sum all the entries they contain
for (LedgerStat ls : ledgers.tailMap(position.getLedgerId(), false).values()) {
@@ -808,16 +792,11 @@ synchronized PositionImpl skipEntries(PositionImpl startPosition, int entriesToS
entriesToSkip += startPosition.getEntryId();
while (entriesToSkip > 0) {
- if (currentLedger != null && ledgerId == currentLedger.getId()) {
+ if (ledgerId == currentLedger.getId()) {
checkArgument(entriesToSkip <= (currentLedger.getLastAddConfirmed() + 1));
return new PositionImpl(ledgerId, entriesToSkip);
} else {
LedgerStat ledger = ledgers.get(ledgerId);
- if (ledger == null) {
- checkArgument(!ledgers.isEmpty());
- ledgerId = ledgers.ceilingKey(ledgerId);
- continue;
- }
if (entriesToSkip < ledger.getEntriesCount()) {
return new PositionImpl(ledgerId, entriesToSkip);
@@ -842,14 +821,14 @@ synchronized PositionImpl skipEntries(PositionImpl startPosition, int entriesToS
*/
synchronized boolean isValidPosition(PositionImpl position) {
if (position.getLedgerId() == currentLedger.getId()) {
- return position.getEntryId() <= currentLedger.getLastAddConfirmed();
+ return position.getEntryId() <= (currentLedger.getLastAddConfirmed() + 1);
} else {
// Look in the ledgers map
LedgerStat ls = ledgers.get(position.getLedgerId());
if (ls == null)
return false;
- return position.getEntryId() < ls.getEntriesCount();
+ return position.getEntryId() <= ls.getEntriesCount();
}
}
@@ -874,6 +853,12 @@ private void checkFenced() throws ManagedLedgerException {
}
}
+ private void checkManagedLedgerIsOpen() throws ManagedLedgerException {
+ if (state == State.Closed) {
+ throw new ManagedLedgerException("ManagedLedger " + name + " has already been closed");
+ }
+ }
+
synchronized void setFenced() {
state = State.Fenced;
}
View
110 src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -56,6 +56,11 @@ void readFromEmptyLedger() throws Exception {
entries = c1.readEntries(10);
assertEquals(entries.size(), 0);
+
+ // Test string representation
+ assertEquals(c1.toString(), "ManagedCursorImpl{ledger=my_test_ledger, name=c1, ackPos=3:-1, readPos=3:1}");
+
+ factory.shutdown();
}
@Test(timeOut = 3000)
@@ -134,6 +139,32 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.await();
}
+ @Test(timeOut = 3000, expectedExceptions = IllegalArgumentException.class)
+ void asyncReadWithInvalidParameter() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedger ledger = factory.open("my_test_ledger");
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+
+ final CountDownLatch counter = new CountDownLatch(1);
+
+ stopBKCluster();
+
+ cursor.asyncReadEntries(0, new ReadEntriesCallback() {
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ fail("async-call should have failed");
+ }
+
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ counter.countDown();
+ }
+
+ }, null);
+
+ counter.await();
+ }
+
@Test(timeOut = 3000)
void asyncMarkDeleteWithErrors() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
@@ -207,6 +238,22 @@ void skipEntries2() throws Exception {
cursor.readEntries(2);
cursor.skip(1);
assertEquals(cursor.getNumberOfEntries(), 3);
+
+ try {
+ cursor.skip(4);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ cursor.skip(3);
+
+ try {
+ cursor.skip(1);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
}
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -263,54 +310,37 @@ void seekPosition3() throws Exception {
cursor.seek(new PositionImpl(seekPosition.getLedgerId(), seekPosition.getEntryId()));
}
- @Test(expectedExceptions = IllegalArgumentException.class)
- void seekPositionEmpty() throws Exception {
- ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
- ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
- ManagedCursor cursor = ledger.openCursor("c1");
-
- Position currentPosition = cursor.getReadPosition();
- cursor.seek(currentPosition);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- void seekPositionWithError() throws Exception {
- ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
- ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
- ManagedCursor cursor = ledger.openCursor("c1");
- ledger.addEntry("dummy-entry-1".getBytes(Encoding));
- ledger.addEntry("dummy-entry-2".getBytes(Encoding));
-
- Position p = cursor.getReadPosition();
- List<Entry> entries = cursor.readEntries(1);
- assertEquals(entries.size(), 1);
- cursor.markDelete(entries.get(0).getPosition());
- cursor.seek(p);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- void seekPositionWithError2() throws Exception {
- ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
- ManagedLedger ledger = factory.open("my_test_ledger");
- ManagedCursor cursor = ledger.openCursor("c1");
- ledger.addEntry("dummy-entry-1".getBytes(Encoding));
- ledger.addEntry("dummy-entry-2".getBytes(Encoding));
-
- PositionImpl p = (PositionImpl) cursor.getReadPosition();
- cursor.seek(new PositionImpl(p.getLedgerId(), 2));
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
+ @Test
void seekPositionWithError3() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
- ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
+ ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
ManagedCursor cursor = ledger.openCursor("c1");
- ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ PositionImpl firstPosition = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
ledger.addEntry("dummy-entry-3".getBytes(Encoding));
PositionImpl lastPosition = (PositionImpl) ledger.addEntry("dummy-entry-4".getBytes(Encoding));
cursor.seek(new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() + 1));
+ try {
+ cursor.seek(new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() + 2));
+ fail("Should have failed");
+ } catch (IllegalArgumentException e) {
+ // Ok
+ }
+
+ try {
+ cursor.seek(new PositionImpl(firstPosition.getLedgerId(), 1000));
+ fail("Should have failed");
+ } catch (IllegalArgumentException e) {
+ // Ok
+ }
+
+ try {
+ cursor.seek(new PositionImpl(firstPosition.getLedgerId() + 1000, 0));
+ fail("Should have failed");
+ } catch (IllegalArgumentException e) {
+ // Ok
+ }
}
@Test(timeOut = 3000)
View
183 src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -125,6 +125,7 @@ public void simple() throws Exception {
assertEquals(entries.size(), 0);
ledger.close();
+ factory.shutdown();
}
@Test(timeOut = 3000)
@@ -704,7 +705,7 @@ public void testEmptyManagedLedgerContent() throws Exception {
assertEquals(ledger.getNumberOfEntries(), 1);
}
- @Test(timeOut = 5000, enabled = false)
+ @Test(timeOut = 5000)
public void testProducerAndNoConsumer() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
@@ -726,6 +727,34 @@ public void testProducerAndNoConsumer() throws Exception {
Thread.sleep(10);
}
+ @Test(timeOut = 5000)
+ public void testTrimmer() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+ ManagedLedger ledger = factory.open("my_test_ledger", config);
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ assertEquals(ledger.getNumberOfEntries(), 0);
+
+ ledger.addEntry("entry-1".getBytes(Encoding));
+ ledger.addEntry("entry-2".getBytes(Encoding));
+ ledger.addEntry("entry-3".getBytes(Encoding));
+ ledger.addEntry("entry-4".getBytes(Encoding));
+ assertEquals(ledger.getNumberOfEntries(), 4);
+
+ cursor.readEntries(1);
+ cursor.readEntries(1);
+ Position lastPosition = cursor.readEntries(1).get(0).getPosition();
+
+ assertEquals(ledger.getNumberOfEntries(), 4);
+
+ cursor.markDelete(lastPosition);
+
+ while (ledger.getNumberOfEntries() != 2) {
+ Thread.sleep(10);
+ }
+ }
+
@Test(timeOut = 3000)
public void testAsyncAddEntryAndSyncClose() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
@@ -855,6 +884,12 @@ public void fenceManagedLedger() throws Exception {
}
try {
+ ledger1.addEntry("entry-2".getBytes(Encoding));
+ fail("Expecting exception");
+ } catch (ManagedLedgerFencedException e) {
+ }
+
+ try {
cursor1.readEntries(10);
fail("Expecting exception");
} catch (ManagedLedgerFencedException e) {
@@ -871,4 +906,150 @@ public void fenceManagedLedger() throws Exception {
assertEquals(cursor2.getNumberOfEntries(), 2);
}
+ @Test
+ public void forceCloseLedgers() throws Exception {
+ ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedger ledger1 = factory1.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+ ledger1.openCursor("c1");
+ ManagedCursor c2 = ledger1.openCursor("c2");
+ ledger1.addEntry("entry-1".getBytes(Encoding));
+ ledger1.addEntry("entry-2".getBytes(Encoding));
+ ledger1.addEntry("entry-3".getBytes(Encoding));
+
+ c2.readEntries(1);
+ c2.readEntries(1);
+ c2.readEntries(1);
+
+ ledger1.close();
+
+ try {
+ ledger1.addEntry("entry-3".getBytes(Encoding));
+ fail("should not have reached this point");
+ } catch (ManagedLedgerException e) {
+ // ok
+ }
+
+ try {
+ ledger1.openCursor("new-cursor");
+ fail("should not have reached this point");
+ } catch (ManagedLedgerException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void closeLedgerWithError() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedger ledger = factory.open("my_test_ledger");
+ ledger.addEntry("entry-1".getBytes(Encoding));
+
+ stopZKCluster();
+ stopBKCluster();
+
+ try {
+ ledger.close();
+ // fail("should have thrown exception");
+ } catch (ManagedLedgerException e) {
+ // Ok
+ }
+ }
+
+ @Test(timeOut = 3000)
+ public void deleteWithErrors1() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+ ManagedLedger ledger = factory.open("my_test_ledger");
+
+ PositionImpl position = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ assertEquals(ledger.getNumberOfEntries(), 1);
+ ledger.close();
+
+ // Force delete a ledger and test that deleting the ML still happens
+ // without errors
+ bkc.deleteLedger(position.getLedgerId());
+ factory.delete("my_test_ledger");
+ }
+
+ @Test(timeOut = 3000)
+ public void deleteWithErrors2() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+ ManagedLedger ledger = factory.open("my_test_ledger");
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.close();
+
+ stopZKCluster();
+
+ try {
+ factory.delete("my_test_ledger");
+ fail("should have failed");
+ } catch (ManagedLedgerException e) {
+ // ok
+ }
+ }
+
+ @Test(timeOut = 3000)
+ public void readWithErrors1() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+ ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+ ManagedCursor cursor = ledger.openCursor("c1");
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+
+ stopZKCluster();
+
+ try {
+ cursor.readEntries(10);
+ fail("should have failed");
+ } catch (ManagedLedgerException e) {
+ // ok
+ }
+
+ try {
+ ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+ fail("should have failed");
+ } catch (ManagedLedgerException e) {
+ // ok
+ }
+ }
+
+ @Test(timeOut = 3000, enabled = false)
+ void concurrentAsyncOpen() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+ final CountDownLatch counter = new CountDownLatch(2);
+
+ class Result {
+ ManagedLedger instance1 = null;
+ ManagedLedger instance2 = null;
+ }
+
+ final Result result = new Result();
+ factory.asyncOpen("my-test-ledger", new OpenLedgerCallback() {
+
+ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ result.instance1 = ledger;
+ counter.countDown();
+ }
+
+ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ }
+ }, null);
+
+ factory.asyncOpen("my-test-ledger", new OpenLedgerCallback() {
+
+ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ result.instance2 = ledger;
+ counter.countDown();
+ }
+
+ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ }
+ }, null);
+
+ counter.await();
+ assertEquals(result.instance1, result.instance2);
+ assertNotNull(result.instance1);
+ }
}

0 comments on commit 71b9c18

Please sign in to comment.