Skip to content

Commit

Permalink
Speed up the rebuildinding of RocksDB index (apache#3458)
Browse files Browse the repository at this point in the history
* Speed up the rebuildinding of RocksDB index

* fix check style

Co-authored-by: chenhang <chenhang@apache.org>
(cherry picked from commit 7004d99)
  • Loading branch information
merlimat authored and zymap committed Feb 16, 2023
1 parent 540bc14 commit a6a4074
Showing 1 changed file with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
Expand All @@ -55,6 +56,8 @@ public LocationsIndexRebuildOp(ServerConfiguration conf) {
this.conf = conf;
}

private static final int BATCH_COMMIT_SIZE = 10_000;

public void initiate() throws IOException {
LOG.info("Starting locations index rebuilding");

Expand Down Expand Up @@ -82,6 +85,8 @@ public void initiate() throws IOException {
int totalEntryLogs = entryLogs.size();
int completedEntryLogs = 0;
LOG.info("Scanning {} entry logs", totalEntryLogs);
AtomicReference<KeyValueStorage.Batch> batch = new AtomicReference<>(newIndex.newBatch());
AtomicInteger count = new AtomicInteger();

for (long entryLogId : entryLogs) {
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
Expand All @@ -100,7 +105,15 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
// Update the ledger index page
LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
LongWrapper value = LongWrapper.get(location);
newIndex.put(key.array, value.array);
batch.get().put(key.array, value.array);

if (count.incrementAndGet() > BATCH_COMMIT_SIZE) {
batch.get().flush();
batch.get().close();

batch.set(newIndex.newBatch());
count.set(0);
}
}

@Override
Expand All @@ -112,6 +125,8 @@ public boolean accept(long ledgerId) {
++completedEntryLogs;
LOG.info("Completed scanning of log {}.log -- {} / {}", Long.toHexString(entryLogId), completedEntryLogs,
totalEntryLogs);
batch.get().flush();
batch.get().close();
}

newIndex.sync();
Expand Down

0 comments on commit a6a4074

Please sign in to comment.