Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added 'early termination' to Streaming slop pusher job

  • Loading branch information...
commit 9b32149f849f88908b7e36343ec026c5e3316a78 1 parent dab95e2
@rsumbaly rsumbaly authored
View
13 src/java/voldemort/server/VoldemortConfig.java
@@ -131,6 +131,7 @@
private final long slopFrequencyMs;
private long slopMaxWriteBytesPerSec, slopMaxReadBytesPerSec;
private int slopBatchSize;
+ private int slopZonesDownToTerminate;
private int adminCoreThreads;
private int adminMaxThreads;
@@ -262,6 +263,7 @@ public VoldemortConfig(Props props) {
this.slopFrequencyMs = props.getLong("slop.frequency.ms", 5 * 60 * 1000);
this.slopBatchSize = props.getInt("slop.batch.size", 100);
this.pusherType = props.getString("pusher.type", StreamingSlopPusherJob.TYPE_NAME);
+ this.slopZonesDownToTerminate = props.getInt("slop.zone.terminate", 1);
this.schedulerThreads = props.getInt("scheduler.threads", 6);
@@ -755,6 +757,17 @@ public void setPusherType(String pusherType) {
}
/**
+ * Number of zones declared down before we terminate the pusher job
+ */
+ public int getSlopZonesDownToTerminate() {
+ return this.slopZonesDownToTerminate;
+ }
+
+ public void setSlopZonesDownToTerminate(int slopZonesDownToTerminate) {
+ this.slopZonesDownToTerminate = slopZonesDownToTerminate;
+ }
+
+ /**
* Returns the size of the batch used while streaming slops
*/
public int getSlopBatchSize() {
View
104 src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
@@ -3,6 +3,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -39,6 +40,7 @@
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class StreamingSlopPusherJob implements Runnable {
@@ -55,10 +57,11 @@
private final ExecutorService consumerExecutor;
private final EventThrottler writeThrottler;
private final EventThrottler readThrottler;
- private final AdminClient adminClient;
+ private AdminClient adminClient;
private final Cluster cluster;
private final List<Future> consumerResults;
private final VoldemortConfig voldemortConfig;
+ private final Map<Integer, Set<Integer>> zoneMapping;
private ConcurrentHashMap<Integer, Long> attemptedByNode, succeededByNode;
public StreamingSlopPusherJob(StoreRepository storeRepo,
@@ -83,13 +86,38 @@ public Thread newThread(Runnable r) {
});
this.writeThrottler = new EventThrottler(voldemortConfig.getSlopMaxWriteBytesPerSec());
this.readThrottler = new EventThrottler(voldemortConfig.getSlopMaxReadBytesPerSec());
- this.adminClient = new AdminClient(cluster,
- new AdminClientConfig().setMaxThreads(cluster.getNumberOfNodes())
- .setMaxConnectionsPerNode(1));
+ this.adminClient = null;
this.consumerResults = Lists.newArrayList();
this.attemptedByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
this.succeededByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
+ /**
+ * Required for early termination - Since the slop pusher job runs
+ * periodically it does full bdb sweeps resulting in random disk seeks
+ * (since bdb still doesn't support a cursor which reads k/vs in disk
+ * order). This can definitely slow the incoming requests. So to avoid
+ * this we include a new strategy which terminates the sweep when we
+ * find that all nodes in 'n' zones are down ( where n =
+ * getSlopZonesDownToTerminate ). We keep a mapping of zone ids to their
+ * node ids
+ */
+ this.zoneMapping = Maps.newHashMap();
+
+ }
+
+ private boolean checkZonesDown() {
+ int zonesDown = 0;
+ for(Integer zoneId: zoneMapping.keySet()) {
+ if(zoneMapping.get(zoneId).size() == 0)
+ zonesDown++;
+ }
+
+ if(voldemortConfig.getSlopZonesDownToTerminate() > 0
+ && voldemortConfig.getSlopZonesDownToTerminate() <= zoneMapping.size()
+ && zonesDown >= voldemortConfig.getSlopZonesDownToTerminate()) {
+ return true;
+ }
+ return false;
}
public void run() {
@@ -100,16 +128,35 @@ public void run() {
return;
}
- // Allow only one job to run at one time - Two jobs may get started if
- // someone disables and enables (through JMX) immediately during a run
+ /**
+ * Allow only one job to run at one time - Two jobs may get started if
+ * someone disables and enables (through JMX) immediately during a run
+ */
synchronized(lock) {
- boolean exceptionOccurred = false;
+ boolean terminatedEarly = false;
Date startTime = new Date();
logger.info("Started streaming slop pusher job at " + startTime);
SlopStorageEngine slopStorageEngine = storeRepo.getSlopStore();
ClosableIterator<Pair<ByteArray, Versioned<Slop>>> iterator = null;
+ if(adminClient == null) {
+ adminClient = new AdminClient(cluster,
+ new AdminClientConfig().setMaxThreads(cluster.getNumberOfNodes())
+ .setMaxConnectionsPerNode(1));
+ }
+
+ // Populating the zone mapping
+ zoneMapping.clear();
+ for(Node n: cluster.getNodes()) {
+ Set<Integer> nodes = zoneMapping.get(n.getZoneId());
+ if(nodes == null) {
+ nodes = Sets.newHashSet();
+ zoneMapping.put(n.getZoneId(), nodes);
+ }
+ nodes.add(n.getId());
+ }
+
// Clearing the statistics
AtomicLong attemptedPushes = new AtomicLong(0);
for(Node node: cluster.getNodes()) {
@@ -137,10 +184,12 @@ public void run() {
if(attemptedPushes.get() % 10000 == 0)
logger.info("Attempted pushing " + attemptedPushes + " slops");
- logger.trace("On slop = " + versioned.getValue().getNodeId() + " => "
- + new String(versioned.getValue().getKey().get()));
+ if(logger.isTraceEnabled())
+ logger.trace("Pushing slop for " + versioned.getValue().getNodeId()
+ + " and store " + versioned.getValue().getStoreName());
if(failureDetector.isAvailable(node)) {
+ zoneMapping.get(node.getZoneId()).add(node.getId());
SynchronousQueue<Versioned<Slop>> slopQueue = slopQueues.get(nodeId);
if(slopQueue == null) {
// No previous slop queue, add one
@@ -155,6 +204,13 @@ public void run() {
TimeUnit.MILLISECONDS);
readThrottler.maybeThrottle(nBytesRead(keyAndVal));
} else {
+ zoneMapping.get(node.getZoneId()).remove(node.getId());
+
+ // Check if we can terminate early
+ if(checkZonesDown()) {
+ terminatedEarly = true;
+ return;
+ }
logger.trace(node + " declared down, won't push slop");
}
} catch(RejectedExecutionException e) {
@@ -164,10 +220,10 @@ public void run() {
} catch(InterruptedException e) {
logger.warn("Interrupted exception", e);
- exceptionOccurred = true;
+ terminatedEarly = true;
} catch(Exception e) {
logger.error(e, e);
- exceptionOccurred = true;
+ terminatedEarly = true;
} finally {
try {
if(iterator != null)
@@ -176,7 +232,7 @@ public void run() {
logger.warn("Failed to close iterator cleanly as database might be closed", e);
}
- // Adding to poison pill
+ // Adding the poison pill
for(SynchronousQueue<Versioned<Slop>> slopQueue: slopQueues.values()) {
try {
slopQueue.offer(END,
@@ -196,27 +252,41 @@ public void run() {
}
// Only if exception didn't take place do we update the counts
- if(!exceptionOccurred) {
+ if(!terminatedEarly) {
Map<Integer, Long> outstanding = Maps.newHashMapWithExpectedSize(cluster.getNumberOfNodes());
for(int nodeId: succeededByNode.keySet()) {
- logger.debug("Slops pushed to node " + nodeId + " - "
- + succeededByNode.get(nodeId));
+ logger.info("Slops to node " + nodeId + " - Succeeded - "
+ + succeededByNode.get(nodeId) + " - Attempted - "
+ + attemptedByNode.get(nodeId));
outstanding.put(nodeId, attemptedByNode.get(nodeId)
- succeededByNode.get(nodeId));
}
-
slopStorageEngine.resetStats(outstanding);
+ logger.info("Completed streaming slop pusher job which started at " + startTime);
+ } else {
+ for(int nodeId: succeededByNode.keySet()) {
+ logger.info("Slops to node " + nodeId + " - Succeeded - "
+ + succeededByNode.get(nodeId) + " - Attempted - "
+ + attemptedByNode.get(nodeId));
+ }
+ logger.info("Completed early streaming slop pusher job which started at "
+ + startTime);
}
// Shut down admin client as not to waste connections
consumerResults.clear();
slopQueues.clear();
+ cleanUp();
}
- logger.info("Completed streaming slop pusher job which started at " + startTime);
}
}
+ private void cleanUp() {
+ adminClient.stop();
+ adminClient = null;
+ }
+
private int nBytesRead(Pair<ByteArray, Versioned<Slop>> keyAndVal) {
return keyAndVal.getFirst().length() + slopSize(keyAndVal.getSecond());
}
Please sign in to comment.
Something went wrong with that request. Please try again.