Permalink
Browse files

BOOKKEEPER-112: Bookie Recovery on an open ledger will cause LedgerHa…

…ndle#close on that ledger to fail (sijie)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1307743 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 235bc1f commit 682a23fb23c8de7756d5ef226fd6e97dd8d4e561 @sijie committed Mar 31, 2012
View
@@ -74,6 +74,8 @@ Trunk (unreleased changes)
BOOKKEEPER-198: replaying entries of deleted ledgers would exhaust ledger cache. (sijie)
+ BOOKKEEPER-112: Bookie Recovery on an open ledger will cause LedgerHandle#close on that ledger to fail (sijie)
+
hedwig-server/
BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)
@@ -36,6 +36,7 @@
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -83,7 +84,6 @@
private DigestType DIGEST_TYPE;
private byte[] PASSWD;
-
/**
* Constructor that takes in a ZooKeeper servers connect string so we know
* how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -426,8 +426,8 @@ private void recoverLedger(final InetSocketAddress bookieSrc, final long lId,
* ledger fragments are stored on. Check if any of the ledger fragments
* for the current ledger are stored on the input dead bookie.
*/
- DigestType digestType = getLedgerDigestType(lId);
- byte[] passwd = getLedgerPasswd(lId);
+ final DigestType digestType = getLedgerDigestType(lId);
+ final byte[] passwd = getLedgerPasswd(lId);
bkc.asyncOpenLedgerNoRecovery(lId, digestType, passwd, new OpenCallback() {
@Override
public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
@@ -436,6 +436,39 @@ public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
ledgerIterCb.processResult(rc, null, null);
return;
}
+
+ LedgerMetadata lm = lh.getLedgerMetadata();
+ if (!lm.isClosed() &&
+ lm.getEnsembles().size() > 0) {
+ Long lastKey = lm.getEnsembles().lastKey();
+ ArrayList<InetSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
+ // the original write has not removed faulty bookie from
+ // current ledger ensemble. to avoid data loss issue in
+ // the case of concurrent updates to the ensemble composition,
+ // the recovery tool should first close the ledger
+ if (lastEnsemble.contains(bookieSrc)) {
+ // close opened non recovery ledger handle
+ try {
+ lh.close();
+ } catch (Exception ie) {
+ LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+ }
+ bkc.asyncOpenLedger(lId, digestType, passwd, new OpenCallback() {
+ @Override
+ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
+ if (newrc != Code.OK.intValue()) {
+ LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
+ ledgerIterCb.processResult(newrc, null, null);
+ return;
+ }
+ // do recovery
+ recoverLedger(bookieSrc, lId, ledgerIterCb, availableBookies);
+ }
+ }, null);
+ return;
+ }
+ }
+
/*
* This List stores the ledger fragments to recover indexed by
* the start entry ID for the range. The ensembles TreeMap is
@@ -465,6 +498,12 @@ public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
ledgerFragmentsToRecover.add(entry.getKey());
}
}
+ // add last ensemble otherwise if the failed bookie existed in
+ // the last ensemble of a closed ledger. the entries belonged to
+ // last ensemble would not be replicated.
+ if (curEntryId != null) {
+ ledgerFragmentsRange.put(curEntryId, lh.getLastAddConfirmed());
+ }
/*
* See if this current ledger contains any ledger fragment that
* needs to be re-replicated. If not, then just invoke the
@@ -503,7 +542,6 @@ public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
+ "," + endEntryId + "] of ledger " + lh.getId()
+ " to " + newBookie);
}
-
try {
SingleFragmentCallback cb = new SingleFragmentCallback(
ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
@@ -551,6 +589,12 @@ private void recoverLedgerFragment(final InetSocketAddress bookieSrc, final Ledg
cb.processResult(BKException.Code.OK, null, null);
return;
}
+ if (startEntryId > endEntryId) {
+ // for open ledger which there is no entry, the start entry id is 0, the end entry id is -1.
+ // we can return immediately to trigger forward read
+ cb.processResult(BKException.Code.OK, null, null);
+ return;
+ }
ArrayList<InetSocketAddress> curEnsemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
int bookieIndex = 0;
@@ -603,7 +647,7 @@ private void recoverLedgerFragment(final InetSocketAddress bookieSrc, final Ledg
* entries that were stored on the failed bookie.
*/
private void recoverLedgerFragmentEntry(final Long entryId, final LedgerHandle lh,
- final MultiCallback ledgerFragmentEntryMcb,
+ final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
final InetSocketAddress newBookie) throws InterruptedException {
/*
* Read the ledger entry using the LedgerHandle. This will allow us to
@@ -659,13 +703,14 @@ public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress
* be a multicallback responsible for all fragments in a single ledger
*/
class SingleFragmentCallback implements AsyncCallback.VoidCallback {
- final MultiCallback ledgerFragmentsMcb;
+ final AsyncCallback.VoidCallback ledgerFragmentsMcb;
final LedgerHandle lh;
final long fragmentStartId;
final InetSocketAddress oldBookie;
final InetSocketAddress newBookie;
- SingleFragmentCallback(MultiCallback ledgerFragmentsMcb, LedgerHandle lh,
+ SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
+ LedgerHandle lh,
long fragmentStartId,
InetSocketAddress oldBookie,
InetSocketAddress newBookie) {
@@ -684,6 +729,10 @@ public void processResult(int rc, String path, Object ctx) {
ledgerFragmentsMcb.processResult(rc, null, null);
return;
}
+ writeLedgerConfig();
+ }
+
+ protected void writeLedgerConfig() {
/*
* Update the ledger metadata's ensemble info to point
* to the new bookie.
@@ -693,20 +742,32 @@ public void processResult(int rc, String path, Object ctx) {
int deadBookieIndex = ensemble.indexOf(oldBookie);
ensemble.remove(deadBookieIndex);
ensemble.add(deadBookieIndex, newBookie);
-
-
+
lh.writeLedgerConfig(new WriteCb(), null);
}
private class WriteCb implements AsyncCallback.StatCallback {
@Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
+ public void processResult(int rc, final String path, Object ctx, Stat stat) {
if (rc == Code.BADVERSION.intValue()) {
LOG.warn("Two fragments attempted update at once; ledger id: " + lh.getId()
+ " startid: " + fragmentStartId);
// try again, the previous success (with which this has conflicted)
// will have updated the stat
- lh.writeLedgerConfig(new WriteCb(), null);
+ // other operations such as (addEnsemble) would update it too.
+ lh.rereadMetadata(new GenericCallback<LedgerMetadata>() {
+ @Override
+ public void operationComplete(int rc, LedgerMetadata newMeta) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("Error reading updated ledger metadata for ledger " + lh.getId(),
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ ledgerFragmentsMcb.processResult(rc, null, null);
+ } else {
+ lh.metadata = newMeta;
+ writeLedgerConfig();
+ }
+ }
+ });
return;
} else if (rc != Code.OK.intValue()) {
LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
@@ -725,4 +786,5 @@ public void processResult(int rc, String path, Object ctx, Stat stat) {
}
};
}
+
}
Oops, something went wrong.

0 comments on commit 682a23f

Please sign in to comment.