Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Bookkeeper replication worker should choose bookies from cluster rath…
Browse files Browse the repository at this point in the history
…er than using local bookie

- make ledger fragment act as ensemble, to replicate failed bookies at the same time.
- remove bookie address from replication worker, so the replication worker would choose bookies from cluster rather than using local bookie.

RB_ID=673942
  • Loading branch information
Sijie Guo committed Jun 12, 2015
1 parent c152f18 commit fc7e171
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.base.Preconditions;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
Expand Down Expand Up @@ -766,35 +768,10 @@ public void processResult(int rc, String path, Object ctx) {
for (final Long startEntryId : ledgerFragmentsToRecover) {
Long endEntryId = ledgerFragmentsRange.get(startEntryId);
ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>();
// Construct a new ensemble
newEnsemble.addAll(ensemble);
// Get bookies to replace
Set<Integer> bookieIndexesReplaced= new HashSet<Integer>();
Set<BookieSocketAddress> targetBookieAddresses = new HashSet<BookieSocketAddress>();
Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookieMap =
new HashMap<BookieSocketAddress, BookieSocketAddress>();
Map<Integer, BookieSocketAddress> targetBookieAddresses;
try {
Set<BookieSocketAddress> bookiesToExclude = new HashSet<BookieSocketAddress>();
bookiesToExclude.addAll(bookiesSrc);
for (int bookieIndex = 0; bookieIndex < ensemble.size(); bookieIndex++) {
BookieSocketAddress bookieInEnsemble = ensemble.get(bookieIndex);
if (bookiesSrc.contains(bookieInEnsemble)) {
BookieSocketAddress newBookie =
bkc.getPlacementPolicy().replaceBookie(lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize(),
ensemble,
bookieInEnsemble,
bookiesToExclude);
newEnsemble.set(bookieIndex, newBookie);
bookieIndexesReplaced.add(bookieIndex);
targetBookieAddresses.add(newBookie);
oldBookie2NewBookieMap.put(bookieInEnsemble, newBookie);
// exclude new bookie for following allocation
bookiesToExclude.add(newBookie);
}
}
targetBookieAddresses = getReplacedBookies(lh, ensemble, bookiesSrc);
} catch (BKException.BKNotEnoughBookiesException e) {
if (!dryrun) {
ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
Expand All @@ -806,6 +783,8 @@ public void processResult(int rc, String path, Object ctx) {
}

if (dryrun) {
ArrayList<BookieSocketAddress> newEnsemble =
replaceBookiesInEnsemble(ensemble, targetBookieAddresses);
System.out.println(" Fragment [" + startEntryId + " - " + endEntryId + " ] : ");
System.out.println(" old ensemble : " + formatEnsemble(ensemble, bookiesSrc, '*'));
System.out.println(" new ensemble : " + formatEnsemble(newEnsemble, bookiesSrc, '*'));
Expand All @@ -819,10 +798,10 @@ public void processResult(int rc, String path, Object ctx) {
}
try {
LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
ledgerFragmentsMcb, lh, startEntryId, oldBookie2NewBookieMap);
ledgerFragmentsMcb, lh, startEntryId, getReplacedBookiesMap(ensemble, targetBookieAddresses));
LedgerFragment ledgerFragment = new LedgerFragment(lh,
startEntryId, endEntryId, bookieIndexesReplaced);
asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddresses);
startEntryId, endEntryId, targetBookieAddresses.keySet());
asyncRecoverLedgerFragment(lh, ledgerFragment, cb, Sets.newHashSet(targetBookieAddresses.values()));
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
return;
Expand Down Expand Up @@ -876,28 +855,90 @@ private void asyncRecoverLedgerFragment(final LedgerHandle lh,
lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies);
}

private Map<Integer, BookieSocketAddress> getReplacedBookies(
LedgerHandle lh,
List<BookieSocketAddress> ensemble,
Set<BookieSocketAddress> bookiesToRereplicate)
throws BKException.BKNotEnoughBookiesException {
Set<Integer> bookieIndexesToRereplicate = Sets.newHashSet();
for (int bookieIndex = 0; bookieIndex < ensemble.size(); bookieIndex++) {
BookieSocketAddress bookieInEnsemble = ensemble.get(bookieIndex);
if (bookiesToRereplicate.contains(bookieInEnsemble)) {
bookieIndexesToRereplicate.add(bookieIndex);
}
}
return getReplacedBookiesByIndexes(
lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate));
}

private Map<Integer, BookieSocketAddress> getReplacedBookiesByIndexes(
LedgerHandle lh,
List<BookieSocketAddress> ensemble,
Set<Integer> bookieIndexesToRereplicate,
Optional<Set<BookieSocketAddress>> excludedBookies)
throws BKException.BKNotEnoughBookiesException {
// target bookies to replicate
Map<Integer, BookieSocketAddress> targetBookieAddresses =
Maps.newHashMapWithExpectedSize(bookieIndexesToRereplicate.size());
// bookies to exclude for ensemble allocation
Set<BookieSocketAddress> bookiesToExclude = Sets.newHashSet();
if (excludedBookies.isPresent()) {
bookiesToExclude.addAll(excludedBookies.get());
}

// excluding bookies that need to be replicated
for (Integer bookieIndex : bookieIndexesToRereplicate) {
BookieSocketAddress bookie = ensemble.get(bookieIndex);
bookiesToExclude.add(bookie);
}

// allocate bookies
for (Integer bookieIndex : bookieIndexesToRereplicate) {
BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
BookieSocketAddress newBookie =
bkc.getPlacementPolicy().replaceBookie(
lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize(),
ensemble,
oldBookie,
bookiesToExclude);
targetBookieAddresses.put(bookieIndex, newBookie);
bookiesToExclude.add(newBookie);
}

return targetBookieAddresses;
}

private ArrayList<BookieSocketAddress> replaceBookiesInEnsemble(
List<BookieSocketAddress> ensemble,
Map<Integer, BookieSocketAddress> replacedBookies) {
ArrayList<BookieSocketAddress> newEnsemble = Lists.newArrayList(ensemble);
for (Map.Entry<Integer, BookieSocketAddress> entry : replacedBookies.entrySet()) {
newEnsemble.set(entry.getKey(), entry.getValue());
}
return newEnsemble;
}

/**
* Replicate the Ledger fragment to target Bookie passed.
*
* @param lh
* - ledgerHandle
* @param ledgerFragment
* - LedgerFragment to replicate
* @param targetBookieAddress
* - target Bookie, to where entries should be replicated.
*/
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
final BookieSocketAddress targetBookieAddress)
final LedgerFragment ledgerFragment)
throws InterruptedException, BKException {
Preconditions.checkArgument(ledgerFragment.getBookiesIndexes().size() == 1);
Optional<Set<BookieSocketAddress>> excludedBookies = Optional.absent();
Map<Integer, BookieSocketAddress> targetBookieAddresses =
new HashMap<Integer, BookieSocketAddress>();
targetBookieAddresses.put(ledgerFragment.getBookiesIndexes().iterator().next(), targetBookieAddress);
getReplacedBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
ledgerFragment.getBookiesIndexes(), excludedBookies);
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses);
}

public void replicateLedgerFragment(LedgerHandle lh,
private void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
final Map<Integer, BookieSocketAddress> targetBookieAddresses)
throws InterruptedException, BKException {
Expand All @@ -915,9 +956,24 @@ public void replicateLedgerFragment(LedgerHandle lh,
}
}

private static Map<BookieSocketAddress, BookieSocketAddress> getReplacedBookiesMap(LedgerFragment ledgerFragment,
private static Map<BookieSocketAddress, BookieSocketAddress> getReplacedBookiesMap(
ArrayList<BookieSocketAddress> ensemble,
Map<Integer, BookieSocketAddress> targetBookieAddresses) {
Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
new HashMap<BookieSocketAddress, BookieSocketAddress>();
for (Map.Entry<Integer, BookieSocketAddress> entry : targetBookieAddresses.entrySet()) {
BookieSocketAddress oldBookie = ensemble.get(entry.getKey());
BookieSocketAddress newBookie = entry.getValue();
bookiesMap.put(oldBookie, newBookie);
}
return bookiesMap;
}

private static Map<BookieSocketAddress, BookieSocketAddress> getReplacedBookiesMap(
LedgerFragment ledgerFragment,
Map<Integer, BookieSocketAddress> targetBookieAddresses) {
Map<BookieSocketAddress, BookieSocketAddress> bookiesMap = new HashMap<BookieSocketAddress, BookieSocketAddress>();
Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
new HashMap<BookieSocketAddress, BookieSocketAddress>();
for (Integer bookieIndex : ledgerFragment.getBookiesIndexes()) {
BookieSocketAddress oldBookie = ledgerFragment.getAddress(bookieIndex);
BookieSocketAddress newBookie = targetBookieAddresses.get(bookieIndex);
Expand Down

0 comments on commit fc7e171

Please sign in to comment.