Skip to content

Commit

Permalink
[bk-server] Clean up overriplicated ledgers owned by different bookies
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Jun 28, 2019
1 parent b34162a commit 67a375c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
Expand All @@ -45,6 +47,7 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -149,6 +152,7 @@ public void gc(GarbageCleaner garbageCleaner) {
long start;
long end = -1;
boolean done = false;
AtomicBoolean isValidEnsemble = new AtomicBoolean(true);
while (!done) {
start = end + 1;
if (ledgerRangeIterator.hasNext()) {
Expand All @@ -169,9 +173,11 @@ public void gc(GarbageCleaner garbageCleaner) {
for (Long bkLid : subBkActiveLedgers) {
if (!ledgersInMetadata.contains(bkLid)) {
if (verifyMetadataOnGc) {
isValidEnsemble.set(false);
int rc = BKException.Code.OK;
Versioned<LedgerMetadata> metadata = null;
try {
result(ledgerManager.readLedgerMetadata(bkLid), zkOpTimeoutMs, TimeUnit.MILLISECONDS);
metadata = result(ledgerManager.readLedgerMetadata(bkLid), zkOpTimeoutMs, TimeUnit.MILLISECONDS);
} catch (BKException | TimeoutException e) {
if (e instanceof BKException) {
rc = ((BKException) e).getCode();
Expand All @@ -182,7 +188,18 @@ public void gc(GarbageCleaner garbageCleaner) {
continue;
}
}
if (rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
// check bookie is part of ensembles for one of the
// segment
if (metadata != null && metadata.getValue() != null) {
metadata.getValue().getAllEnsembles().forEach((entryId, ensembles) -> {
if (ensembles.contains(selfBookieAddress)) {
isValidEnsemble.set(true);
}
});
if (isValidEnsemble.get()) {
continue;
}
} else if (rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
LOG.warn("Ledger {} Missing in metadata list, but ledgerManager returned rc: {}.",
bkLid, rc);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
Expand Down Expand Up @@ -93,12 +94,17 @@ public GcLedgersTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
super(lmFactoryCls);
}

private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
BookieSocketAddress selfBookie = Bookie.getBookieAddress(baseConf);
createLedgers(numLedgers, createdLedgers, selfBookie);
}

/**
* Create ledgers.
*/
private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
private void createLedgers(int numLedgers, final Set<Long> createdLedgers, BookieSocketAddress selfBookie) throws IOException {
final AtomicInteger expected = new AtomicInteger(numLedgers);
List<BookieSocketAddress> ensemble = Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234));
List<BookieSocketAddress> ensemble = Lists.newArrayList(selfBookie);

for (int i = 0; i < numLedgers; i++) {
getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
Expand Down Expand Up @@ -692,4 +698,55 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
return null;
}
}

/**
* Verifies that gc should cleaned up overreplicatd ledgers which is not
* owned by the bookie anymore.
*
* @throws Exception
*/
@Test
public void testGcLedgersForOverreplicated() throws Exception {
baseConf.setVerifyMetadataOnGc(true);
final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
final SortedSet<Long> cleaned = Collections.synchronizedSortedSet(new TreeSet<Long>());

// Create few ledgers
final int numLedgers = 5;

BookieSocketAddress bookieAddress = new BookieSocketAddress("192.0.0.1", 1234);
createLedgers(numLedgers, createdLedgers, bookieAddress);

LedgerManager mockLedgerManager = new CleanupLedgerManager(getLedgerManager()) {
@Override
public LedgerRangeIterator getLedgerRanges(long zkOpTimeout) {
return new LedgerRangeIterator() {
@Override
public LedgerRange next() throws IOException {
return null;
}

@Override
public boolean hasNext() throws IOException {
return false;
}
};
}
};

final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(mockLedgerManager,
new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
LOG.info("Cleaned {}", ledgerId);
cleaned.add(ledgerId);
}
};

validateLedgerRangeIterator(createdLedgers);

garbageCollector.gc(cleaner);
assertEquals("Should have cleaned all ledgers", cleaned.size(), numLedgers);
}
}

0 comments on commit 67a375c

Please sign in to comment.