diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 55c5c90c31e..e15457ca8ef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -32,7 +32,8 @@ import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; @@ -55,6 +56,8 @@ public LocationsIndexRebuildOp(ServerConfiguration conf) { this.conf = conf; } + private static final int BATCH_COMMIT_SIZE = 10_000; + public void initiate() throws IOException { LOG.info("Starting locations index rebuilding"); @@ -82,6 +85,8 @@ public void initiate() throws IOException { int totalEntryLogs = entryLogs.size(); int completedEntryLogs = 0; LOG.info("Scanning {} entry logs", totalEntryLogs); + AtomicReference batch = new AtomicReference<>(newIndex.newBatch()); + AtomicInteger count = new AtomicInteger(); for (long entryLogId : entryLogs) { entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() { @@ -100,7 +105,15 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio // Update the ledger index page LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId); LongWrapper value = LongWrapper.get(location); - newIndex.put(key.array, value.array); + batch.get().put(key.array, value.array); + + if (count.incrementAndGet() > BATCH_COMMIT_SIZE) { + batch.get().flush(); + batch.get().close(); + + batch.set(newIndex.newBatch()); + count.set(0); + } } @Override @@ -112,6 +125,8 @@ public boolean accept(long ledgerId) { ++completedEntryLogs; LOG.info("Completed scanning of log {}.log -- {} / {}", Long.toHexString(entryLogId), completedEntryLogs, totalEntryLogs); + batch.get().flush(); + batch.get().close(); } newIndex.sync();