diff --git a/src/java/voldemort/utils/ConsistencyFix.java b/src/java/voldemort/utils/ConsistencyFix.java index ef25c53ddd..61d2cf1ce7 100644 --- a/src/java/voldemort/utils/ConsistencyFix.java +++ b/src/java/voldemort/utils/ConsistencyFix.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -48,8 +50,10 @@ public class ConsistencyFix { private final AdminClient adminClient; private final StoreInstance storeInstance; private final Stats stats; + private final long perServerIOPSLimit; + private final ConcurrentMap putThrottlers; - ConsistencyFix(String url, String storeName, long progressBar) { + ConsistencyFix(String url, String storeName, long progressBar, long perServerIOPSLimit) { this.storeName = storeName; logger.info("Connecting to bootstrap server: " + url); this.adminClient = new AdminClient(url, new AdminClientConfig(), 0); @@ -65,6 +69,9 @@ public class ConsistencyFix { storeInstance = new StoreInstance(cluster, storeDefinition); stats = new Stats(progressBar); + + this.perServerIOPSLimit = perServerIOPSLimit; + this.putThrottlers = new ConcurrentHashMap(); } public String getStoreName() { @@ -83,6 +90,18 @@ public Stats getStats() { return stats; } + /** + * Throttle put (repair) activity per server. + * + * @param nodeId The node for which to possibly throttle put activity. + */ + public void maybePutThrottle(int nodeId) { + if(!putThrottlers.containsKey(nodeId)) { + putThrottlers.putIfAbsent(nodeId, new EventThrottler(perServerIOPSLimit)); + } + putThrottlers.get(nodeId).maybeThrottle(1); + } + /** * Status of the repair of a specific "bad key" */ diff --git a/src/java/voldemort/utils/ConsistencyFixCLI.java b/src/java/voldemort/utils/ConsistencyFixCLI.java index 9d4a47e395..2901a18e9e 100644 --- a/src/java/voldemort/utils/ConsistencyFixCLI.java +++ b/src/java/voldemort/utils/ConsistencyFixCLI.java @@ -39,6 +39,7 @@ private static class Options { public final static int defaultParallelism = 8; public final static long defaultProgressBar = 1000; + public final static long defaultPerServerIOPSLimit = 100; public String url = null; public String storeName = null; @@ -46,6 +47,7 @@ private static class Options { public String badKeyFileOut = null; public int parallelism = defaultParallelism; public long progressBar = defaultProgressBar; + public long perServerIOPSLimit = defaultPerServerIOPSLimit; } /** @@ -86,6 +88,12 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) { .describedAs("Number of operations between 'info' progress messages. " + "[Default value: " + Options.defaultProgressBar + "]") .ofType(Long.class); + parser.accepts("per-server-iops-limit") + .withRequiredArg() + .describedAs("Number of operations that the consistency fixer will issue into any individual server in one second. " + + "[Default value: " + Options.defaultPerServerIOPSLimit + "]") + .ofType(Long.class); + OptionSet optionSet = parser.parse(args); if(optionSet.hasArgument("help")) { @@ -122,6 +130,9 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) { if(optionSet.has("progress-bar")) { options.progressBar = (Long) optionSet.valueOf("progress-bar"); } + if(optionSet.has("per-server-iops-limit")) { + options.perServerIOPSLimit = (Long) optionSet.valueOf("per-server-iops-limit"); + } return options; } @@ -131,7 +142,8 @@ public static void main(String[] args) throws Exception { ConsistencyFix consistencyFix = new ConsistencyFix(options.url, options.storeName, - options.progressBar); + options.progressBar, + options.perServerIOPSLimit); String summary = consistencyFix.execute(options.parallelism, options.badKeyFileIn, diff --git a/src/java/voldemort/utils/ConsistencyFixWorker.java b/src/java/voldemort/utils/ConsistencyFixWorker.java index a0adc5d73e..23427746f1 100644 --- a/src/java/voldemort/utils/ConsistencyFixWorker.java +++ b/src/java/voldemort/utils/ConsistencyFixWorker.java @@ -265,6 +265,7 @@ public Status doRepairPut(final List> toReadRepair) boolean allRepairsSuccessful = true; for(NodeValue nodeKeyValue: toReadRepair) { try { + consistencyFix.maybePutThrottle(nodeKeyValue.getNodeId()); consistencyFix.getAdminClient().storeOps.putNodeKeyValue(consistencyFix.getStoreName(), nodeKeyValue); } catch(ObsoleteVersionException ove) { diff --git a/src/java/voldemort/utils/EventThrottler.java b/src/java/voldemort/utils/EventThrottler.java index 770660b345..0375e17a4e 100644 --- a/src/java/voldemort/utils/EventThrottler.java +++ b/src/java/voldemort/utils/EventThrottler.java @@ -46,7 +46,17 @@ public EventThrottler(Time time, long ratePerSecond, long intervalMs) { this.startTime = 0L; } + /** + * Sleeps if necessary to slow down the caller. + * + * @param eventsSeen Number of events seen since last invocation. Basis for + * determining whether its necessary to sleep. + */ public synchronized void maybeThrottle(int eventsSeen) { + // TODO: This implements "bang bang" control. This is OK. But, this + // permits unbounded bursts of activity within the intervalMs. A + // controller that has more memory and explicitly bounds peak activity + // within the intervalMs may be better. long rateLimit = getRate(); if(logger.isDebugEnabled())