Skip to content

Commit

Permalink
Added per-server throttling to the Consistency Fixer.
Browse files Browse the repository at this point in the history
Added a map of EventThrottle objects such that repair traffic to each server can be throttled. We care about throttling write rate because of its potential impact on GC and cleaning.
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent e5ddb34 commit 8cb8efa
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
21 changes: 20 additions & 1 deletion src/java/voldemort/utils/ConsistencyFix.java
Expand Up @@ -25,6 +25,8 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -48,8 +50,10 @@ public class ConsistencyFix {
private final AdminClient adminClient;
private final StoreInstance storeInstance;
private final Stats stats;
private final long perServerIOPSLimit;
private final ConcurrentMap<Integer, EventThrottler> putThrottlers;

ConsistencyFix(String url, String storeName, long progressBar) {
ConsistencyFix(String url, String storeName, long progressBar, long perServerIOPSLimit) {
this.storeName = storeName;
logger.info("Connecting to bootstrap server: " + url);
this.adminClient = new AdminClient(url, new AdminClientConfig(), 0);
Expand All @@ -65,6 +69,9 @@ public class ConsistencyFix {
storeInstance = new StoreInstance(cluster, storeDefinition);

stats = new Stats(progressBar);

this.perServerIOPSLimit = perServerIOPSLimit;
this.putThrottlers = new ConcurrentHashMap<Integer, EventThrottler>();
}

public String getStoreName() {
Expand All @@ -83,6 +90,18 @@ public Stats getStats() {
return stats;
}

/**
* Throttle put (repair) activity per server.
*
* @param nodeId The node for which to possibly throttle put activity.
*/
public void maybePutThrottle(int nodeId) {
if(!putThrottlers.containsKey(nodeId)) {
putThrottlers.putIfAbsent(nodeId, new EventThrottler(perServerIOPSLimit));
}
putThrottlers.get(nodeId).maybeThrottle(1);
}

/**
* Status of the repair of a specific "bad key"
*/
Expand Down
14 changes: 13 additions & 1 deletion src/java/voldemort/utils/ConsistencyFixCLI.java
Expand Up @@ -39,13 +39,15 @@ private static class Options {

public final static int defaultParallelism = 8;
public final static long defaultProgressBar = 1000;
public final static long defaultPerServerIOPSLimit = 100;

public String url = null;
public String storeName = null;
public String badKeyFileIn = null;
public String badKeyFileOut = null;
public int parallelism = defaultParallelism;
public long progressBar = defaultProgressBar;
public long perServerIOPSLimit = defaultPerServerIOPSLimit;
}

/**
Expand Down Expand Up @@ -86,6 +88,12 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {
.describedAs("Number of operations between 'info' progress messages. "
+ "[Default value: " + Options.defaultProgressBar + "]")
.ofType(Long.class);
parser.accepts("per-server-iops-limit")
.withRequiredArg()
.describedAs("Number of operations that the consistency fixer will issue into any individual server in one second. "
+ "[Default value: " + Options.defaultPerServerIOPSLimit + "]")
.ofType(Long.class);

OptionSet optionSet = parser.parse(args);

if(optionSet.hasArgument("help")) {
Expand Down Expand Up @@ -122,6 +130,9 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) {
if(optionSet.has("progress-bar")) {
options.progressBar = (Long) optionSet.valueOf("progress-bar");
}
if(optionSet.has("per-server-iops-limit")) {
options.perServerIOPSLimit = (Long) optionSet.valueOf("per-server-iops-limit");
}

return options;
}
Expand All @@ -131,7 +142,8 @@ public static void main(String[] args) throws Exception {

ConsistencyFix consistencyFix = new ConsistencyFix(options.url,
options.storeName,
options.progressBar);
options.progressBar,
options.perServerIOPSLimit);

String summary = consistencyFix.execute(options.parallelism,
options.badKeyFileIn,
Expand Down
1 change: 1 addition & 0 deletions src/java/voldemort/utils/ConsistencyFixWorker.java
Expand Up @@ -265,6 +265,7 @@ public Status doRepairPut(final List<NodeValue<ByteArray, byte[]>> toReadRepair)
boolean allRepairsSuccessful = true;
for(NodeValue<ByteArray, byte[]> nodeKeyValue: toReadRepair) {
try {
consistencyFix.maybePutThrottle(nodeKeyValue.getNodeId());
consistencyFix.getAdminClient().storeOps.putNodeKeyValue(consistencyFix.getStoreName(),
nodeKeyValue);
} catch(ObsoleteVersionException ove) {
Expand Down
10 changes: 10 additions & 0 deletions src/java/voldemort/utils/EventThrottler.java
Expand Up @@ -46,7 +46,17 @@ public EventThrottler(Time time, long ratePerSecond, long intervalMs) {
this.startTime = 0L;
}

/**
* Sleeps if necessary to slow down the caller.
*
* @param eventsSeen Number of events seen since last invocation. Basis for
* determining whether its necessary to sleep.
*/
public synchronized void maybeThrottle(int eventsSeen) {
// TODO: This implements "bang bang" control. This is OK. But, this
// permits unbounded bursts of activity within the intervalMs. A
// controller that has more memory and explicitly bounds peak activity
// within the intervalMs may be better.
long rateLimit = getRate();

if(logger.isDebugEnabled())
Expand Down

0 comments on commit 8cb8efa

Please sign in to comment.