Skip to content

Commit

Permalink
Revert "Speed up the rebuildinding of RocksDB index (apache#3458)"
Browse files Browse the repository at this point in the history
This reverts commit a6a4074.
  • Loading branch information
zymap committed Feb 21, 2023
1 parent acfa520 commit d93069d
Showing 1 changed file with 2 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
Expand All @@ -56,8 +55,6 @@ public LocationsIndexRebuildOp(ServerConfiguration conf) {
this.conf = conf;
}

private static final int BATCH_COMMIT_SIZE = 10_000;

public void initiate() throws IOException {
LOG.info("Starting locations index rebuilding");

Expand Down Expand Up @@ -85,8 +82,6 @@ public void initiate() throws IOException {
int totalEntryLogs = entryLogs.size();
int completedEntryLogs = 0;
LOG.info("Scanning {} entry logs", totalEntryLogs);
AtomicReference<KeyValueStorage.Batch> batch = new AtomicReference<>(newIndex.newBatch());
AtomicInteger count = new AtomicInteger();

for (long entryLogId : entryLogs) {
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
Expand All @@ -105,15 +100,7 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
// Update the ledger index page
LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
LongWrapper value = LongWrapper.get(location);
batch.get().put(key.array, value.array);

if (count.incrementAndGet() > BATCH_COMMIT_SIZE) {
batch.get().flush();
batch.get().close();

batch.set(newIndex.newBatch());
count.set(0);
}
newIndex.put(key.array, value.array);
}

@Override
Expand All @@ -125,8 +112,6 @@ public boolean accept(long ledgerId) {
++completedEntryLogs;
LOG.info("Completed scanning of log {}.log -- {} / {}", Long.toHexString(entryLogId), completedEntryLogs,
totalEntryLogs);
batch.get().flush();
batch.get().close();
}

newIndex.sync();
Expand Down

0 comments on commit d93069d

Please sign in to comment.