Skip to content

Commit

Permalink
Actually invoke the BadKeyOrphanReader. Also committing a bunch of
Browse files Browse the repository at this point in the history
TODOs for later cleanup.
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent f573b53 commit 1d115da
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -1606,6 +1606,9 @@ public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchEntries(int nodeId,
return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
}

// TODO: The use of "Pair" in the return for a fundamental type is
// awkward. We should have a core KeyValue type that effectively wraps
// up a ByteArray and a Versioned<byte[]>.
/**
* Fetch key/value tuples belonging to this map of replica type to
* partition list
Expand Down
25 changes: 20 additions & 5 deletions src/java/voldemort/utils/ConsistencyFix.java
Expand Up @@ -46,6 +46,9 @@
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

// TODO: Move to new directory voldemort/tools. Also move ConsistencyCheck,
// Rebalance, and possibly other tools (shells and so on). This would reduce the
// amount of different stuff in the utils directory.
public class ConsistencyFix {

private static final Logger logger = Logger.getLogger(ConsistencyFix.class);
Expand Down Expand Up @@ -135,6 +138,10 @@ public String execute(int parallelism,
ExecutorService badKeyWriterService;
ExecutorService consistencyFixWorkers;

// TODO: Add ThreadFactory usage to ExecutorService usage so that
// threads have sane names. Figure out if any parameters currently
// passed from object-to-object could be given directly to factories.

// Create BadKeyWriter thread
BlockingQueue<BadKeyResult> badKeyQOut = new ArrayBlockingQueue<BadKeyResult>(parallelism * 10);
badKeyWriterService = Executors.newSingleThreadExecutor();
Expand All @@ -155,11 +162,19 @@ public String execute(int parallelism,
// Create BadKeyReader thread
CountDownLatch allBadKeysReadLatch = new CountDownLatch(1);
badKeyReaderService = Executors.newSingleThreadExecutor();
badKeyReaderService.submit(new BadKeyReader(allBadKeysReadLatch,
badKeyFileIn,
this,
consistencyFixWorkers,
badKeyQOut));
if(!orphanFormat) {
badKeyReaderService.submit(new BadKeyReader(allBadKeysReadLatch,
badKeyFileIn,
this,
consistencyFixWorkers,
badKeyQOut));
} else {
badKeyReaderService.submit(new BadKeyOrphanReader(allBadKeysReadLatch,
badKeyFileIn,
this,
consistencyFixWorkers,
badKeyQOut));
}
logger.info("Created badKeyReader.");

try {
Expand Down
1 change: 1 addition & 0 deletions src/java/voldemort/utils/ConsistencyFixCLI.java
Expand Up @@ -38,6 +38,7 @@ public static void printUsage(String errMessage) {
private static class Options {

public final static int defaultParallelism = 8;
// TODO: change name to progressPeriodMs (or S, as case may be)
public final static long defaultProgressBar = 1000;
public final static long defaultPerServerIOPSLimit = 100;

Expand Down

0 comments on commit 1d115da

Please sign in to comment.