Skip to content

Commit

Permalink
BOOKKEEPER-946: Provide an option to delay auto recovery of lost bookies
Browse files Browse the repository at this point in the history
If auto recovery is enabled, and a bookie goes down for upgrade or even if it looses zk connection
intermittently, the auditor detects it as a lost bookie and starts under replication detection and
the replication workers on other bookie nodes start replicating the under replicated ledgers. All
of this stops once the bookie comes up but by then a few ledgers would get replicated. Given the
fact that we have multiple copies of data, it is probably not necessary to start the recovery as
soon as a bookie goes down. We can wait for an hour or so and then start recovery. This should
cover cases like planned upgrade, intermittent network connectivity loss, etc.

This change:
    1) Provides a bookie option 'lostBookieRecoveryDelay' in secs, which when set to a non zero value,
       will delay the start of recovery by that number of seconds. By default, this option is set to 0;
       which means the audit is not delayed.
    2) If another bookie were to go down in this interval, the recovery is immediately started and the
       one scheduled for future is canceled.
    3) Adds counters to track how many audits were delayed(apache#1) and how many scheduled audits were
       canceled due to multiple bookie failures(apache#2).
    4) Four junit tests to verify the new feature.
  • Loading branch information
rithin-shetty committed Oct 11, 2016
1 parent 9dc05fc commit fd40702
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 31 deletions.
3 changes: 3 additions & 0 deletions bookkeeper-server/conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ zkTimeout=10000
# The interval is specified in seconds.
#auditorPeriodicBookieCheckInterval=86400

# How long to wait, in seconds, before starting auto recovery of a lost bookie
#lostBookieRecoveryDelay=0

# number of threads that should handle write requests. if zero, the writes would
# be handled by netty threads directly.
# numAddWorkerThreads=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class ServerConfiguration extends AbstractConfiguration {
protected final static String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
protected final static String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
protected final static String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
protected final static String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";

// Worker Thread parameters.
protected final static String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
Expand Down Expand Up @@ -1339,6 +1340,22 @@ public boolean isAutoRecoveryDaemonEnabled() {
return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false);
}

/**
* Get how long to delay the recovery of ledgers of a lost bookie.
*
* @return delay interval in seconds
*/
public int getLostBookieRecoveryDelay() {
return getInt(LOST_BOOKIE_RECOVERY_DELAY, 0);
}

/**
* Set the delay interval for starting recovery of a lost bookie.
*/
public void setLostBookieRecoveryDelay(int interval) {
setProperty(LOST_BOOKIE_RECOVERY_DELAY, interval);
}

/**
* Sets that whether force start a bookie in readonly mode
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public class Auditor implements BookiesListener {
private final Counter numLedgersChecked;
private final OpStatsLogger numFragmentsPerLedger;
private final OpStatsLogger numBookiesPerLedger;
private final Counter numBookieAuditsDelayed;
private final Counter numDelayedBookieAuditsCancelled;
private volatile Future<?> auditTask;
private Set<String> bookiesToBeAudited = Sets.newHashSet();

public Auditor(final String bookieIdentifier, ServerConfiguration conf,
ZooKeeper zkc, StatsLogger statsLogger) throws UnavailableException {
Expand All @@ -106,12 +110,17 @@ public Auditor(final String bookieIdentifier, ServerConfiguration conf,
this.statsLogger = statsLogger;

numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
uRLPublishTimeForLostBookies = this.statsLogger.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
bookieToLedgersMapCreationTime = this.statsLogger.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
uRLPublishTimeForLostBookies = this.statsLogger
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
bookieToLedgersMapCreationTime = this.statsLogger
.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
checkAllLedgersTime = this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
numLedgersChecked = this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
numFragmentsPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
numBookiesPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
numDelayedBookieAuditsCancelled = this.statsLogger
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);

initialize(conf, zkc);

Expand Down Expand Up @@ -189,27 +198,65 @@ public void run() {
Collection<String> newBookies = CollectionUtils.subtract(
availableBookies, knownBookies);
knownBookies.addAll(newBookies);
if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) {
// the bookie, which went down earlier and had an audit scheduled for,
// has come up. So let us stop tracking it and cancel the audit. Since
// we allow delaying of audit when there is only one failed bookie,
// bookiesToBeAudited should just have 1 element and hence containsAll
// check should be ok
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
numDelayedBookieAuditsCancelled.inc();
}
bookiesToBeAudited.clear();
}

// find lost bookies(if any)
Collection<String> lostBookies = CollectionUtils.subtract(
knownBookies, availableBookies);

if (lostBookies.size() > 0) {
knownBookies.removeAll(lostBookies);

auditBookies();
bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies));

if (bookiesToBeAudited.size() > 0) {
knownBookies.removeAll(bookiesToBeAudited);
if (conf.getLostBookieRecoveryDelay() == 0) {
startAudit(false);
bookiesToBeAudited.clear();
} else {
if (bookiesToBeAudited.size() > 1) {
// if more than one bookie is down, start the audit immediately;
LOG.info("Multiple bookie failure; not delaying bookie audit. Bookies lost now: "
+ CollectionUtils.subtract(knownBookies, availableBookies)
+"; All lost bookies: " + bookiesToBeAudited.toString());
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
numDelayedBookieAuditsCancelled.inc();
}
startAudit(false);
bookiesToBeAudited.clear();
} else if (auditTask == null) {
// if there is no scheduled audit, schedule one
auditTask = executor.schedule( new Runnable() {
public void run() {
startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
}
}, conf.getLostBookieRecoveryDelay(), TimeUnit.SECONDS);
numBookieAuditsDelayed.inc();
LOG.info("Delaying bookie audit by " + conf.getLostBookieRecoveryDelay()
+ "secs for " + bookiesToBeAudited.toString());
} else {
// there is only one bookie that is down and an audit is already scheduled
LOG.info("Audit already scheduled; not scheduling another for "
+ bookiesToBeAudited.toString());
}
}
}
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
} catch (UnavailableException ue) {
LOG.error("Exception while watching available bookies", ue);
} catch (KeeperException ke) {
LOG.error("Exception reading bookie list", ke);
}
}
});
Expand All @@ -231,8 +278,6 @@ public void start() {
+ " 'auditorPeriodicCheckInterval' {} seconds", interval);
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
LOG.info("Running periodic check");

try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger replication disabled, skipping");
Expand Down Expand Up @@ -310,6 +355,35 @@ private void notifyBookieChanges() throws BKException {
admin.notifyReadOnlyBookiesChanged(this);
}

/**
* Start running the actual audit task
*
* @param shutDownTask
* A boolean that indicates whether or not to schedule shutdown task on any failure
*/
private void startAudit(boolean shutDownTask) {
try {
auditBookies();
shutDownTask = false;
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
shutDownTask &= true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
shutDownTask &= true;
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
shutDownTask &= true;
} catch (KeeperException ke) {
LOG.error("Exception reading bookie list", ke);
shutDownTask &= true;
}
if (shutDownTask) {
submitShutdownTask();
}
}

@SuppressWarnings("unchecked")
private void auditBookies()
throws BKAuditException, KeeperException,
Expand Down Expand Up @@ -585,21 +659,14 @@ public boolean isRunning() {

private final Runnable BOOKIE_CHECK = new Runnable() {
public void run() {
try {
auditBookies();
} catch (BKException bke) {
LOG.error("Couldn't get bookie list, exiting", bke);
submitShutdownTask();
} catch (KeeperException ke) {
LOG.error("Exception while watching available bookies", ke);
submitShutdownTask();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
submitShutdownTask();
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
submitShutdownTask();
if (auditTask == null) {
startAudit(true);
} else {
// if due to a lost bookie an audit task was scheduled,
// let us not run this periodic bookie check now, if we
// went ahead, we'll report under replication and the user
// wanted to avoid that(with lostBookieRecoveryDelay option)
LOG.info("Audit already scheduled; skipping periodic bookie check");
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public interface ReplicationStats {
public final static String NUM_FRAGMENTS_PER_LEDGER = "NUM_FRAGMENTS_PER_LEDGER";
public final static String NUM_BOOKIES_PER_LEDGER = "NUM_BOOKIES_PER_LEDGER";
public final static String NUM_LEDGERS_CHECKED = "NUM_LEDGERS_CHECKED";
public final static String NUM_BOOKIE_AUDITS_DELAYED = "NUM_BOOKIE_AUDITS_DELAYED";
public final static String NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED = "NUM_DELAYED_BOOKIE_AUDITS_CANCELLED";

public final static String REPLICATION_WORKER_SCOPE = "replication_worker";
public final static String REREPLICATE_OP = "rereplicate";
Expand Down
Loading

0 comments on commit fd40702

Please sign in to comment.