Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: merlimat/managed-ledger
base: df05d54ee0
...
head fork: merlimat/managed-ledger
compare: 6c4af2ce15
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 9 files changed
  • 0 commit comments
  • 2 contributors
Commits on Apr 20, 2012
mmerli [bug 5505343] Updated API javadocs
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/platform/yxms/trunk/managed-ledger@34 b48032ab-c6b0-43d1-8dc6-875f9f7cdcf0
1309cfd
@merlimat Merge branch 'trunk'
6c4af2c
View
5 src/main/java/com/yahoo/messaging/bookkeeper/ledger/AsyncCallbacks.java
@@ -2,7 +2,12 @@
import java.util.List;
+/**
+ * Definition of all the callbacks used for the ManagedLedger asynchronous API.
+ *
+ */
public interface AsyncCallbacks {
+
public interface OpenLedgerCallback {
public void openLedgerComplete(Throwable status, ManagedLedger ledger, Object ctx);
}
View
10 src/main/java/com/yahoo/messaging/bookkeeper/ledger/Entry.java
@@ -1,7 +1,17 @@
package com.yahoo.messaging.bookkeeper.ledger;
+/**
+ * An Entry represent a ledger entry data and its associated position.
+ */
public interface Entry {
+
+ /**
+ * @return the data
+ */
byte[] getData();
+ /**
+ * @return the position at which the entry was stored
+ */
Position getPosition();
}
View
50 src/main/java/com/yahoo/messaging/bookkeeper/ledger/ManagedCursor.java
@@ -6,15 +6,16 @@
import com.yahoo.messaging.bookkeeper.ledger.AsyncCallbacks.ReadEntriesCallback;
/**
- * A ManangedCursor is a persisted cursor
- *
- * If the cursor is not marking entries
- *
+ * A ManangedCursor is a persisted cursor inside a ManagedLedger.
+ * <p>
+ * The ManagedCursor is used to read from the ManagedLedger and to signal when
+ * the consumer is done with the messages that it has read before.
*/
public interface ManagedCursor {
/**
- *
+ * Read entries from the ManagedLedger, up to the specified number. The
+ * returned list can be smaller.
*
* @param numberOfEntriesToRead
* maximum number of entries to return
@@ -23,27 +24,50 @@
*/
public List<Entry> readEntries(int numberOfEntriesToRead) throws Exception;
+ /**
+ * Asynchronously read entries from the ManagedLedger.
+ *
+ * @see #readEntries(int)
+ * @param numberOfEntriesToRead
+ * maximum number of entries to return
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
public void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx);
/**
* Tells whether this cursor has already consumed all the available entries.
+ * <p>
+ * This method is not blocking.
*
* @return true if there are pending entries to read, false otherwise
*/
public boolean hasMoreEntries();
/**
+ * This signals that the reader is done with all the entries up to
+ * "position" (included). This can potentially trigger a ledger deletion, if
+ * all the other cursors are done too with the underlying ledger.
*
- * This signals that the reader is done with all the entries up to "entry"
- * (included). This can potentially trigger a ledger deletion, if all the
- * other cursors are done too with the underlying ledger.
- *
- * @param entry
- * the last entry that has been successfully processed
+ * @param position
+ * the last position that have been successfully consumed
* @throws Exception
*/
- public void markDelete(Entry entry) throws Exception;
+ public void markDelete(Position position) throws Exception;
- public void asyncMarkDelete(Entry entry, MarkDeleteCallback callback, Object ctx);
+ /**
+ * Asynchronous mark delete
+ *
+ * @see #markDelete(Position)
+ * @param position
+ * the last position that have been successfully consumed
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
+ public void asyncMarkDelete(Position position, MarkDeleteCallback callback, Object ctx);
}
View
65 src/main/java/com/yahoo/messaging/bookkeeper/ledger/ManagedLedger.java
@@ -5,12 +5,27 @@
import com.yahoo.messaging.bookkeeper.ledger.AsyncCallbacks.OpenCursorCallback;
/**
- * A ManagedLedger it's a superset of a BookKeeper ledger concept. These are the
- * differences :
+ * A ManagedLedger it's a superset of a BookKeeper ledger concept.
+ * <p>
+ * It mimics the concept of an appender log that:
*
* <ul>
- * <li>ManagedLedger has a unique name by which it can be created/reopened</li>
- * <li>xxx</li>
+ * <li>has a unique name (chosen by clients) by which it can be
+ * created/opened/deleted</li>
+ * <li>is always writable: if a writer process crashes, a new writer can re-open
+ * the ManagedLedger and continue writing into it</li>
+ * <li>has multiple persisted consumers (see {@link ManagedCursor}), each of
+ * them with an associated position</li>
+ * <li>when all the consumers have processed all the entries contained in a
+ * Bookkeeper ledger, the ledger is deleted</li>
+ * </ul>
+ * <p>
+ * Caveats:
+ * <ul>
+ * <li>A single ManagedLedger can only be open once at any time. Implementation
+ * can protect double access from the same VM, but accesses from different
+ * machines to the same ManagedLedger need to be avoided through an external
+ * source of coordination.</li>
* </ul>
*/
public interface ManagedLedger {
@@ -24,10 +39,23 @@
* Append a new entry to the end of a managed ledger.
*
* @param data
- * to be added to the managed ledger
+ * data entry to be persisted
+ * @throws Exception
*/
public void addEntry(byte[] data) throws Exception;
+ /**
+ * Append a new entry asynchronously
+ *
+ * @see #addEntry(byte[])
+ * @param data
+ * data entry to be persisted
+ *
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
public void asyncAddEntry(byte[] data, AddEntryCallback callback, Object ctx);
/**
@@ -43,6 +71,17 @@
*/
public ManagedCursor openCursor(String name) throws Exception;
+ /**
+ * Open a ManagedCursor asynchronously.
+ *
+ * @see #openCursor(String)
+ * @param name
+ * the name associated with the ManagedCursor
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
public void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx);
/**
@@ -50,6 +89,8 @@
* <p>
* This is defined by the number of entries in all the BookKeeper ledgers
* that are being maintained by this ManagedLedger.
+ * <p>
+ * This method is non-blocking.
*
* @return the number of entries
*/
@@ -61,19 +102,31 @@
* <p>
* This is defined by the sizes of all the BookKeeper ledgers that are being
* maintained by this ManagedLedger.
+ * <p>
+ * This method is non-blocking.
*
* @return total size in bytes
*/
public long getTotalSize();
/**
- * Close the current virtual ledger.
+ * Close the ManagedLedger.
* <p>
* This will close all the underlying BookKeeper ledgers. All the
* ManagedCursors associated will be invalidated.
*
+ * @throws Exception
*/
public void close() throws Exception;
+ /**
+ * Close the ManagedLedger asynchronously.
+ *
+ * @see #close()
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
public void asyncClose(CloseCallback callback, Object ctx);
}
View
10 src/main/java/com/yahoo/messaging/bookkeeper/ledger/Position.java
@@ -8,12 +8,22 @@
/**
* A Position is a pointer to a specific entry into the managed ledger.
+ * <p>
+ * Specifically a Position is composed of a (LedgerId,EntryId) pair.
*/
public class Position implements Comparable<Position> {
private final long ledgerId;
private final long entryId;
+ /**
+ *
+ * @param text
+ * string serialized position
+ *
+ * @throws IllegalArgumentException
+ * if text is not a valid serialized position
+ */
public Position(String text) {
checkNotNull(text);
View
2  src/main/java/com/yahoo/messaging/bookkeeper/ledger/impl/LedgerStat.java
@@ -12,7 +12,7 @@
import com.google.common.primitives.Longs;
/**
- *
+ * LedgerStat holds a tuple of (LedgerId, EntriesCount, Size)
*/
public class LedgerStat implements Comparable<LedgerStat> {
private final long ledgerId;
View
19 src/main/java/com/yahoo/messaging/bookkeeper/ledger/impl/ManagedCursorImpl.java
@@ -34,6 +34,9 @@
this.ledger = ledger;
this.name = name;
this.acknowledgedPosition = position;
+
+ // The read position has to ahead of the acknowledged position, by at
+ // least 1, since it refers to the next entry that has to be read.
this.readPosition = new Position(position.getLedgerId(), position.getEntryId() + 1);
ledger.getStore().updateConsumer(ledger.getName(), name, acknowledgedPosition);
@@ -96,14 +99,14 @@ public boolean hasMoreEntries() {
* (non-Javadoc)
*
* @see
- * com.yahoo.messaging.bookkeeper.ledger.ManagedCursor#acknowledge(Entry)
+ * com.yahoo.messaging.bookkeeper.ledger.ManagedCursor#acknowledge(Position)
*/
@Override
- public void markDelete(Entry entry) throws Exception {
- checkNotNull(entry);
+ public void markDelete(Position position) throws Exception {
+ checkNotNull(position);
- log.debug("[{}] Mark delete up to position: {}", ledger.getName(), entry.getPosition());
- acknowledgedPosition = entry.getPosition();
+ log.debug("[{}] Mark delete up to position: {}", ledger.getName(), position);
+ acknowledgedPosition = position;
ledger.getStore().updateConsumer(ledger.getName(), name, acknowledgedPosition);
}
@@ -112,19 +115,19 @@ public void markDelete(Entry entry) throws Exception {
*
* @see
* com.yahoo.messaging.bookkeeper.ledger.ManagedCursor#asyncMarkDelete(com
- * .yahoo.messaging.bookkeeper.ledger.Entry,
+ * .yahoo.messaging.bookkeeper.ledger.Position,
* com.yahoo.messaging.bookkeeper.ledger.AsyncCallbacks.MarkDeleteCallback,
* java.lang.Object)
*/
@Override
- public void asyncMarkDelete(final Entry entry, final MarkDeleteCallback callback,
+ public void asyncMarkDelete(final Position position, final MarkDeleteCallback callback,
final Object ctx) {
ledger.getExecutor().execute(new Runnable() {
public void run() {
Exception error = null;
try {
- markDelete(entry);
+ markDelete(position);
} catch (Exception e) {
log.warn("[{}] Got exception when mark deleting entry: {} {}",
va(ledger.getName(), name, e));
View
39 src/main/java/com/yahoo/messaging/bookkeeper/ledger/impl/MetaStore.java
@@ -5,12 +5,51 @@
import com.yahoo.messaging.bookkeeper.ledger.Position;
import com.yahoo.messaging.bookkeeper.ledger.util.Pair;
+/**
+ * Interface that describes the operations that the ManagedLedger need to do on
+ * the metadata store.
+ */
public interface MetaStore {
+
+ /**
+ * Get the list of ledgers used by the ManagedLedger
+ *
+ * @param ledgerName
+ * the name of the ManagedLedger
+ * @return a list of LedgerStats
+ * @throws Exception
+ */
List<LedgerStat> getLedgerIds(String ledgerName) throws Exception;
+ /**
+ * Update the list of LedgerStats associated with a ManagedLedger
+ *
+ * @param ledgerName
+ * the name of the ManagedLedger
+ * @param ledgerIds
+ * a sequence of LedgerStats
+ * @throws Exception
+ */
void updateLedgersIds(String ledgerName, Iterable<LedgerStat> ledgerIds) throws Exception;
+ /**
+ * Get the list of consumer registered on a ManagedLedger.
+ *
+ * @param ledgerName
+ * the name of the ManagedLedger
+ * @return a list of Pair<ConsumerId,Position> for the consumers
+ * @throws Exception
+ */
List<Pair<String, Position>> getConsumers(String ledgerName) throws Exception;
+ /**
+ * Update the persisted position of a consumer
+ *
+ * @param ledgerName
+ * the name of the ManagedLedger
+ * @param consumerName
+ * @param position
+ * @throws Exception
+ */
void updateConsumer(String ledgerName, String consumerName, Position position) throws Exception;
}
View
6 src/test/java/com/yahoo/messaging/bookkeeper/ledger/impl/ManagedLedgerTest.java
@@ -59,7 +59,7 @@ public void managedLedgerApi() throws Exception {
// Acknowledge only on last entry
Entry lastEntry = entries.get(entries.size() - 1);
- cursor.markDelete(lastEntry);
+ cursor.markDelete(lastEntry.getPosition());
log.info("-----------------------");
}
@@ -149,7 +149,7 @@ public void acknowledge1() throws Exception {
assertEquals(entries.size(), 2);
assertEquals(cursor.hasMoreEntries(), false);
- cursor.markDelete(entries.get(0));
+ cursor.markDelete(entries.get(0).getPosition());
ledger.close();
@@ -208,7 +208,7 @@ public void readEntriesComplete(Throwable status,
Entry entry = entries.get(0);
assertEquals(new String(entry.getData(), Encoding), "test");
- cursor.asyncMarkDelete(entry, new MarkDeleteCallback() {
+ cursor.asyncMarkDelete(entry.getPosition(), new MarkDeleteCallback() {
public void markDeleteComplete(Throwable status,
Object ctx) {
assertNull(status);

No commit comments for this range

Something went wrong with that request. Please try again.