From 3e0112505381b3eb922f21de1f18daf302002201 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Sun, 26 Aug 2018 19:51:08 -0700 Subject: [PATCH] Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager Descriptions of the changes in this PR: ### Motivation Auditor has multiple places calling sync methods in async callbacks. This raises the possibility hitting deadlock. Issue #1578 is one of the examples. After looking into the `LedgerUnderreplicationManager`, `markLedgerUnderreplicated` is the only interface that will be called in async callbacks. This change is to provide an async version of `markLedgerUnderreplicated`. ### Changes - add `markLedgerUnderreplicatedAsync` interface in `LedgerUnderreplicationManager`. - implement the logic of `markLedgerUnderreplicated` using async callbacks - use `markLedgerUnderreplicatedAsync` in the Auditor Related Issues: #1578 Master Issue: #1617 Author: Sijie Guo Reviewers: Charan Reddy Guttapalem , Enrico Olivelli , Matteo Merli This closes #1619 from sijie/async_sync_autorecovery --- .../apache/bookkeeper/client/BKException.java | 15 ++ .../meta/LedgerUnderreplicationManager.java | 22 +- .../meta/ZkLedgerUnderreplicationManager.java | 123 +++++---- .../bookkeeper/replication/Auditor.java | 238 +++++++----------- .../replication/ReplicationException.java | 11 + .../TestLedgerUnderreplicationManager.java | 4 +- 6 files changed, 224 insertions(+), 189 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java index 1a15ebf370c..ddfc795acbc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.client; +import java.util.function.Function; + /** * Class the enumerates all the possible error conditions. * @@ -28,6 +30,19 @@ @SuppressWarnings("serial") public abstract class BKException extends org.apache.bookkeeper.client.api.BKException { + public static final Function HANDLER = cause -> { + if (cause == null) { + return null; + } + if (cause instanceof BKException) { + return (BKException) cause; + } else { + BKException ex = new BKUnexpectedConditionException(); + ex.initCause(cause); + return ex; + } + }; + BKException(int code) { super(code); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java index 21c6a4f70fa..5bad3cc7cf2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java @@ -17,11 +17,15 @@ */ package org.apache.bookkeeper.meta; +import com.google.common.collect.Lists; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.replication.ReplicationException; @@ -29,12 +33,26 @@ * Interface for marking ledgers which need to be rereplicated. */ public interface LedgerUnderreplicationManager extends AutoCloseable { + /** * Mark a ledger as underreplicated. The replication should * then check which fragments are underreplicated and rereplicate them */ - void markLedgerUnderreplicated(long ledgerId, String missingReplica) - throws ReplicationException.UnavailableException; + default void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException { + FutureUtils.result( + markLedgerUnderreplicatedAsync( + ledgerId, Lists.newArrayList(missingReplica)), ReplicationException.EXCEPTION_HANDLER); + } + + /** + * Mark a ledger as underreplicated with missing bookies. The replication should then + * check which fragements are underreplicated and rereplicate them. + * + * @param ledgerId ledger id + * @param missingReplicas missing replicas + * @return a future presents the mark result. + */ + CompletableFuture markLedgerUnderreplicatedAsync(long ledgerId, Collection missingReplicas); /** * Mark a ledger as fully replicated. If the ledger is not diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index 4289bcd81a6..db09df97b28 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -24,21 +24,25 @@ import com.google.common.base.Joiner; import com.google.protobuf.TextFormat; +import com.google.protobuf.TextFormat.ParseException; import java.net.UnknownHostException; import java.util.AbstractMap; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.net.DNS; @@ -54,6 +58,7 @@ import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; @@ -254,55 +259,89 @@ public UnderreplicatedLedgerFormat getLedgerUnreplicationInfo(long ledgerId) } @Override - public void markLedgerUnderreplicated(long ledgerId, String missingReplica) - throws ReplicationException.UnavailableException { + public CompletableFuture markLedgerUnderreplicatedAsync(long ledgerId, Collection missingReplicas) { if (LOG.isDebugEnabled()) { - LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplica); - } - try { - List zkAcls = ZkUtils.getACLs(conf); - String znode = getUrLedgerZnode(ledgerId); - while (true) { - UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder(); + LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplicas); + } + final List zkAcls = ZkUtils.getACLs(conf); + final String znode = getUrLedgerZnode(ledgerId); + final CompletableFuture createFuture = new CompletableFuture<>(); + tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture); + return createFuture; + } + + private void tryMarkLedgerUnderreplicatedAsync(final String znode, + final Collection missingReplicas, + final List zkAcls, + final CompletableFuture finalFuture) { + final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder(); + missingReplicas.forEach(builder::addReplica); + final byte[] urLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8); + ZkUtils.asyncCreateFullPathOptimistic( + zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT, + (rc, path, ctx, name) -> { + if (Code.OK.intValue() == rc) { + FutureUtils.complete(finalFuture, null); + } else if (Code.NODEEXISTS.intValue() == rc) { + // we need to handle the case where the ledger has been marked as underreplicated + handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture); + } else { + FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc))); + } + }, null); + } + + + private void handleLedgerUnderreplicatedAlreadyMarked(final String znode, + final Collection missingReplicas, + final List zkAcls, + final CompletableFuture finalFuture) { + // get the existing underreplicated ledger data + zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> { + if (Code.OK.intValue() == getRc) { + // deserialize existing underreplicated ledger data + final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder(); try { - builder.addReplica(missingReplica); - ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat - .printToString(builder.build()).getBytes(UTF_8), - zkAcls, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - Stat s = zkc.exists(znode, false); - if (s == null) { + TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder); + } catch (ParseException e) { + // corrupted metadata in zookeeper + FutureUtils.completeExceptionally(finalFuture, + new ReplicationException.UnavailableException( + "Invalid underreplicated ledger data for ledger " + znode, e)); + return; + } + UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build(); + boolean replicaAdded = false; + for (String missingReplica : missingReplicas) { + if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) { continue; - } - try { - byte[] bytes = zkc.getData(znode, false, s); - builder.clear(); - TextFormat.merge(new String(bytes, UTF_8), builder); - UnderreplicatedLedgerFormat data = builder.build(); - if (data.getReplicaList().contains(missingReplica)) { - return; // nothing to add - } + } else { builder.addReplica(missingReplica); - zkc.setData(znode, - TextFormat.printToString(builder.build()).getBytes(UTF_8), - s.getVersion()); - } catch (KeeperException.NoNodeException nne) { - continue; - } catch (KeeperException.BadVersionException bve) { - continue; - } catch (TextFormat.ParseException pe) { - throw new ReplicationException.UnavailableException( - "Invalid data found", pe); + replicaAdded = true; } } - break; + if (!replicaAdded) { // no new missing replica is added + FutureUtils.complete(finalFuture, null); + return; + } + final byte[] newUrLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8); + zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> { + if (Code.OK.intValue() == setRc) { + FutureUtils.complete(finalFuture, null); + } else if (Code.NONODE.intValue() == setRc) { + tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture); + } else if (Code.BADVERSION.intValue() == setRc) { + handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture); + } else { + FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc))); + } + }, null); + } else if (Code.NONODE.intValue() == getRc) { + tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture); + } else { + FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc))); } - } catch (KeeperException ke) { - throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie); - } + }, null); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 8578a5b148c..f93763203d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; @@ -30,20 +31,21 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.client.LedgerChecker; import org.apache.bookkeeper.client.LedgerFragment; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory; @@ -366,8 +368,6 @@ public void run() { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); LOG.error("Interrupted while running periodic check", ie); - } catch (BKAuditException bkae) { - LOG.error("Exception while running periodic check", bkae); } catch (BKException bke) { LOG.error("Exception running periodic check", bke); } catch (IOException ioe) { @@ -467,8 +467,6 @@ private void startAudit(boolean shutDownTask) { LOG.error("Interrupted while watching available bookies ", ie); } catch (BKAuditException bke) { LOG.error("Exception while watching available bookies", bke); - } catch (KeeperException ke) { - LOG.error("Exception reading bookie list", ke); } if (shutDownTask) { submitShutdownTask(); @@ -477,8 +475,7 @@ private void startAudit(boolean shutDownTask) { @SuppressWarnings("unchecked") private void auditBookies() - throws BKAuditException, KeeperException, - InterruptedException, BKException { + throws BKAuditException, InterruptedException, BKException { try { waitIfLedgerReplicationDisabled(); } catch (UnavailableException ue) { @@ -512,7 +509,12 @@ private void auditBookies() bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); if (lostBookies.size() > 0) { - handleLostBookies(lostBookies, ledgerDetails); + try { + FutureUtils.result( + handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER); + } catch (ReplicationException e) { + throw new BKAuditException(e.getMessage(), e.getCause()); + } uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } @@ -524,38 +526,33 @@ private Map> generateBookie2LedgersIndex() return bookieLedgerIndexer.getBookieToLedgerIndex(); } - private void handleLostBookies(Collection lostBookies, - Map> ledgerDetails) throws BKAuditException { - LOG.info("Following are the failed bookies: " + lostBookies - + " and searching its ledgers for re-replication"); - - for (String bookieIP : lostBookies) { - // identify all the ledgers in bookieIP and publishing these ledgers - // as under-replicated. - publishSuspectedLedgers(bookieIP, ledgerDetails.get(bookieIP)); - } + private CompletableFuture handleLostBookiesAsync(Collection lostBookies, + Map> ledgerDetails) { + LOG.info("Following are the failed bookies: {}," + + " and searching its ledgers for re-replication", lostBookies); + + return FutureUtils.processList( + Lists.newArrayList(lostBookies), + bookieIP -> publishSuspectedLedgersAsync( + Lists.newArrayList(bookieIP), ledgerDetails.get(bookieIP)), + null + ); } - private void publishSuspectedLedgers(String bookieIP, Set ledgers) - throws BKAuditException { + private CompletableFuture publishSuspectedLedgersAsync(Collection missingBookies, Set ledgers) { if (null == ledgers || ledgers.size() == 0) { // there is no ledgers available for this bookie and just // ignoring the bookie failures - LOG.info("There is no ledgers for the failed bookie: {}", bookieIP); - return; + LOG.info("There is no ledgers for the failed bookie: {}", missingBookies); + return FutureUtils.Void(); } - LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, bookieIP); + LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies); numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size()); - for (Long ledgerId : ledgers) { - try { - ledgerUnderreplicationManager.markLedgerUnderreplicated( - ledgerId, bookieIP); - } catch (UnavailableException ue) { - throw new BKAuditException( - "Failed to publish underreplicated ledger: " + ledgerId - + " of bookie: " + bookieIP, ue); - } - } + return FutureUtils.processList( + Lists.newArrayList(ledgers), + ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies), + null + ); } /** @@ -571,36 +568,31 @@ private class ProcessLostFragmentsCb implements GenericCallback fragments) { - try { - if (rc == BKException.Code.OK) { - Set bookies = Sets.newHashSet(); - for (LedgerFragment f : fragments) { - bookies.addAll(f.getAddresses()); - } - for (BookieSocketAddress bookie : bookies) { - publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId())); - } - } - lh.close(); - } catch (BKException bke) { - LOG.error("Error closing lh", bke); - if (rc == BKException.Code.OK) { - rc = BKException.Code.ReplicationException; - } - } catch (InterruptedException ie) { - LOG.error("Interrupted publishing suspected ledger", ie); - Thread.currentThread().interrupt(); - if (rc == BKException.Code.OK) { - rc = BKException.Code.InterruptedException; - } - } catch (BKAuditException bkae) { - LOG.error("Auditor exception publishing suspected ledger", bkae); - if (rc == BKException.Code.OK) { - rc = BKException.Code.ReplicationException; + if (rc == BKException.Code.OK) { + Set bookies = Sets.newHashSet(); + for (LedgerFragment f : fragments) { + bookies.addAll(f.getAddresses()); } + publishSuspectedLedgersAsync( + bookies.stream().map(BookieSocketAddress::toString).collect(Collectors.toList()), + Sets.newHashSet(lh.getId()) + ).whenComplete((result, cause) -> { + if (null != cause) { + LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}", + lh.getId(), bookies, cause); + callback.processResult(Code.ReplicationException, null, null); + } else { + callback.processResult(Code.OK, null, null); + } + }); + } else { + callback.processResult(rc, null, null); } - - callback.processResult(rc, null, null); + lh.closeAsync().whenComplete((result, cause) -> { + if (null != cause) { + LOG.warn("Error closing ledger {} : {}", lh.getId(), cause.getMessage()); + } + }); } } @@ -608,8 +600,7 @@ public void operationComplete(int rc, Set fragments) { * List all the ledgers and check them individually. This should not * be run very often. */ - void checkAllLedgers() throws BKAuditException, BKException, - IOException, InterruptedException, KeeperException { + void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException { ZooKeeper newzk = ZooKeeperClient.newBuilder() .connectString(ZKMetadataDriverBase.resolveZkServers(conf)) .sessionTimeoutMs(conf.getZkTimeout()) @@ -622,91 +613,54 @@ void checkAllLedgers() throws BKAuditException, BKException, try { final LedgerChecker checker = new LedgerChecker(client); - final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK); - final CountDownLatch processDone = new CountDownLatch(1); + final CompletableFuture processFuture = new CompletableFuture<>(); - Processor checkLedgersProcessor = new Processor() { - @Override - public void process(final Long ledgerId, - final AsyncCallback.VoidCallback callback) { - try { - if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) { - LOG.info("Ledger rereplication has been disabled, aborting periodic check"); - processDone.countDown(); - return; - } - } catch (ReplicationException.UnavailableException ue) { - LOG.error("Underreplication manager unavailable running periodic check", ue); - processDone.countDown(); + Processor checkLedgersProcessor = (ledgerId, callback) -> { + try { + if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) { + LOG.info("Ledger rereplication has been disabled, aborting periodic check"); + FutureUtils.complete(processFuture, null); return; } + } catch (UnavailableException ue) { + LOG.error("Underreplication manager unavailable running periodic check", ue); + FutureUtils.complete(processFuture, null); + return; + } - // Do not perform blocking calls that involve making ZK calls from within the ZK - // event thread. Jump to background thread instead to avoid deadlock. - ForkJoinPool.commonPool().execute(() -> { - LedgerHandle lh = null; - try { - lh = admin.openLedgerNoRecovery(ledgerId); - checker.checkLedger(lh, - new ProcessLostFragmentsCb(lh, callback), - conf.getAuditorLedgerVerificationPercentage()); - // we collect the following stats to get a measure of the - // distribution of a single ledger within the bk cluster - // the higher the number of fragments/bookies, the more distributed it is - numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments()); - numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies()); - numLedgersChecked.inc(); - } catch (BKException.BKNoSuchLedgerExistsException bknsle) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ledger was deleted before we could check it", bknsle); - } - callback.processResult(BKException.Code.OK, - null, null); - return; - } catch (BKException bke) { - LOG.error("Couldn't open ledger " + ledgerId, bke); - callback.processResult(BKException.Code.BookieHandleNotAvailableException, - null, null); - return; - } catch (InterruptedException ie) { - LOG.error("Interrupted opening ledger", ie); - Thread.currentThread().interrupt(); - callback.processResult(BKException.Code.InterruptedException, null, null); - return; - } finally { - if (lh != null) { - try { - lh.close(); - } catch (BKException bke) { - LOG.warn("Couldn't close ledger " + ledgerId, bke); - } catch (InterruptedException ie) { - LOG.warn("Interrupted closing ledger " + ledgerId, ie); - Thread.currentThread().interrupt(); - } - } + admin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> { + if (Code.OK == rc) { + checker.checkLedger(lh, + // the ledger handle will be closed after checkLedger is done. + new ProcessLostFragmentsCb(lh, callback), + conf.getAuditorLedgerVerificationPercentage()); + // we collect the following stats to get a measure of the + // distribution of a single ledger within the bk cluster + // the higher the number of fragments/bookies, the more distributed it is + numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments()); + numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies()); + numLedgersChecked.inc(); + } else if (Code.NoSuchLedgerExistsException == rc) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ledger {} was deleted before we could check it", ledgerId); } - }); - } + callback.processResult(Code.OK, null, null); + } else { + LOG.error("Couldn't open ledger {} to check : {}", ledgerId, BKException.getMessage(rc)); + callback.processResult(rc, null, null); + } + }, null); }; ledgerManager.asyncProcessLedgers(checkLedgersProcessor, - new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String s, Object obj) { - returnCode.set(rc); - processDone.countDown(); - } - }, null, BKException.Code.OK, BKException.Code.ReadException); - try { - processDone.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new BKAuditException( - "Exception while checking ledgers", e); - } - if (returnCode.get() != BKException.Code.OK) { - throw BKException.create(returnCode.get()); - } + (rc, path, ctx) -> { + if (Code.OK == rc) { + FutureUtils.complete(processFuture, null); + } else { + FutureUtils.completeExceptionally(processFuture, BKException.create(rc)); + } + }, null, BKException.Code.OK, BKException.Code.ReadException); + FutureUtils.result(processFuture, BKException.HANDLER); } finally { admin.close(); client.close(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java index 1afee699bf0..733f63bde80 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java @@ -18,10 +18,21 @@ package org.apache.bookkeeper.replication; +import java.util.function.Function; + /** * Exceptions for use within the replication service. */ public abstract class ReplicationException extends Exception { + + public static final Function EXCEPTION_HANDLER = cause -> { + if (cause instanceof ReplicationException) { + return (ReplicationException) cause; + } else { + return new UnavailableException(cause.getMessage(), cause); + } + }; + protected ReplicationException(String message, Throwable cause) { super(message, cause); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java index 229af16775d..214777e4258 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java @@ -52,7 +52,6 @@ import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; -import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.test.ZooKeeperUtil; import org.apache.bookkeeper.util.BookKeeperConstants; @@ -724,8 +723,7 @@ public void run() { } private void verifyMarkLedgerUnderreplicated(Collection missingReplica) - throws KeeperException, InterruptedException, - CompatibilityException, UnavailableException { + throws KeeperException, InterruptedException, ReplicationException { Long ledgerA = 0xfeadeefdacL; String znodeA = getUrLedgerZnode(ledgerA); LedgerUnderreplicationManager replicaMgr = lmf1