Skip to content

Commit

Permalink
a) Modified proximity based handoff strategy to use client zones inst…
Browse files Browse the repository at this point in the history
…ead of origin node's zone to reorder the list

b) Made early termination of pusher job better
  • Loading branch information
rsumbaly authored and afeinberg committed Dec 23, 2010
1 parent 536215d commit d5062d6
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 72 deletions.
4 changes: 2 additions & 2 deletions .classpath
Expand Up @@ -51,6 +51,6 @@
<classpathentry kind="lib" path="contrib/hadoop/lib/pig-0.7.1-dev-core.jar"/>
<classpathentry kind="lib" path="contrib/krati/lib/krati-0.3.4.jar"/>
<classpathentry kind="lib" path="lib/jna.jar"/>
<classpathentry kind="lib" path="lib/mockito-all-1.8.5.jar" />
<classpathentry kind="output" path="classes"/>
<classpathentry kind="lib" path="lib/mockito-all-1.8.5.jar" />
<classpathentry kind="output" path="classes"/>
</classpath>
2 changes: 1 addition & 1 deletion src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -265,7 +265,7 @@ public VoldemortConfig(Props props) {
this.slopFrequencyMs = props.getLong("slop.frequency.ms", 5 * 60 * 1000);
this.slopBatchSize = props.getInt("slop.batch.size", 100);
this.pusherType = props.getString("pusher.type", StreamingSlopPusherJob.TYPE_NAME);
this.slopZonesDownToTerminate = props.getInt("slop.zone.terminate", 1);
this.slopZonesDownToTerminate = props.getInt("slop.zones.terminate", 0);

this.schedulerThreads = props.getInt("scheduler.threads", 6);

Expand Down
Expand Up @@ -21,6 +21,7 @@
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.Zone;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
Expand Down Expand Up @@ -92,35 +93,10 @@ public Thread newThread(Runnable r) {
this.attemptedByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
this.succeededByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());

/**
* Required for early termination - Since the slop pusher job runs
* periodically it does full bdb sweeps resulting in random disk seeks
* (since bdb still doesn't support a cursor which reads k/vs in disk
* order). This can definitely slow the incoming requests. So to avoid
* this we include a new strategy which terminates the sweep when we
* find that all nodes in 'n' zones are down ( where n =
* getSlopZonesDownToTerminate ). We keep a mapping of zone ids to their
* node ids
*/
this.zoneMapping = Maps.newHashMap();

}

private boolean checkZonesDown() {
int zonesDown = 0;
for(Integer zoneId: zoneMapping.keySet()) {
if(zoneMapping.get(zoneId).size() == 0)
zonesDown++;
}

if(voldemortConfig.getSlopZonesDownToTerminate() > 0
&& voldemortConfig.getSlopZonesDownToTerminate() <= zoneMapping.size()
&& zonesDown >= voldemortConfig.getSlopZonesDownToTerminate()) {
return true;
}
return false;
}

public void run() {

// don't try to run slop pusher job when rebalancing
Expand All @@ -147,15 +123,36 @@ public void run() {
.setMaxConnectionsPerNode(1));
}

// Populating the zone mapping
zoneMapping.clear();
for(Node n: cluster.getNodes()) {
Set<Integer> nodes = zoneMapping.get(n.getZoneId());
if(nodes == null) {
nodes = Sets.newHashSet();
zoneMapping.put(n.getZoneId(), nodes);
if(voldemortConfig.getSlopZonesDownToTerminate() > 0) {
// Populating the zone mapping for early termination
zoneMapping.clear();
for(Node n: cluster.getNodes()) {
if(failureDetector.isAvailable(n)) {
Set<Integer> nodes = zoneMapping.get(n.getZoneId());
if(nodes == null) {
nodes = Sets.newHashSet();
zoneMapping.put(n.getZoneId(), nodes);
}
nodes.add(n.getId());
}
}

// Check how many zones are down
int zonesDown = 0;
for(Zone zone: cluster.getZones()) {
if(zoneMapping.get(zone.getId()) == null
|| zoneMapping.get(zone.getId()).size() == 0)
zonesDown++;
}

// Terminate early
if(voldemortConfig.getSlopZonesDownToTerminate() <= zoneMapping.size()
&& zonesDown >= voldemortConfig.getSlopZonesDownToTerminate()) {
logger.info("Completed streaming slop pusher job at " + startTime
+ " early because " + zonesDown + " zones are down");
cleanUp();
return;
}
nodes.add(n.getId());
}

// Clearing the statistics
Expand Down Expand Up @@ -190,7 +187,6 @@ public void run() {
+ " and store " + versioned.getValue().getStoreName());

if(failureDetector.isAvailable(node)) {
zoneMapping.get(node.getZoneId()).add(node.getId());
SynchronousQueue<Versioned<Slop>> slopQueue = slopQueues.get(nodeId);
if(slopQueue == null) {
// No previous slop queue, add one
Expand All @@ -205,13 +201,6 @@ public void run() {
TimeUnit.MILLISECONDS);
readThrottler.maybeThrottle(nBytesRead(keyAndVal));
} else {
zoneMapping.get(node.getZoneId()).remove(node.getId());

// Check if we can terminate early
if(checkZonesDown()) {
terminatedEarly = true;
return;
}
logger.trace(node + " declared down, won't push slop");
}
} catch(RejectedExecutionException e) {
Expand Down
Expand Up @@ -44,7 +44,7 @@ public HintedHandoffStrategy updateHintedHandoffStrategy(StoreDefinition storeDe
} else if(HintedHandoffStrategyType.PROXIMITY_STRATEGY.toDisplay()
.compareTo(storeDef.getHintedHandoffStrategyType()
.toDisplay()) == 0) {
return new ProximityHandoffStrategy(cluster);
return new ProximityHandoffStrategy(cluster, clientZoneId);
} else {
throw new VoldemortException("HintedHandoffStrategyType:"
+ storeDef.getHintedHandoffStrategyType()
Expand Down
Expand Up @@ -16,6 +16,7 @@

package voldemort.store.slop.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -31,18 +32,22 @@
*/
public class ProximityHandoffStrategy implements HintedHandoffStrategy {

private final Map<Integer, List<Node>> zoneMapping;
private final Cluster cluster;
private final int clientZoneId;
private Map<Integer, List<Node>> zoneMapping;

/**
* Constructor which makes zone based mapping
*
* @param cluster The cluster
* @param clientZoneId Client zone id
*/
public ProximityHandoffStrategy(Cluster cluster) {
public ProximityHandoffStrategy(Cluster cluster, int clientZoneId) {
this.cluster = cluster;
this.zoneMapping = Maps.newHashMap();
this.clientZoneId = clientZoneId;

// Generate a mapping from zone to nodes
zoneMapping = Maps.newHashMap();
for(Node node: cluster.getNodes()) {
List<Node> nodes = zoneMapping.get(node.getZoneId());
if(nodes == null) {
Expand All @@ -51,20 +56,24 @@ public ProximityHandoffStrategy(Cluster cluster) {
}
nodes.add(node);
}

}

public List<Node> routeHint(Node origin) {
List<Node> prefList = Lists.newArrayList();
prefList.addAll(zoneMapping.get(origin.getZoneId()));
Collections.shuffle(prefList);
if(cluster.getNumberOfZones() > 1) {
for(Integer zoneId: cluster.getZoneById(origin.getZoneId()).getProximityList()) {
List<Node> nodesInZone = zoneMapping.get(zoneId);
Collections.shuffle(nodesInZone);
prefList.addAll(nodesInZone);
}
List<Node> proximityList = new ArrayList<Node>();

// Add the client zone id
Collections.shuffle(zoneMapping.get(clientZoneId));
proximityList.addAll(zoneMapping.get(clientZoneId));

for(Integer zoneId: cluster.getZoneById(clientZoneId).getProximityList()) {
Collections.shuffle(zoneMapping.get(zoneId));
proximityList.addAll(zoneMapping.get(zoneId));
}
return prefList;

// Remove the origin node
proximityList.remove(origin);
return proximityList;
}

@Override
Expand Down
Expand Up @@ -17,8 +17,6 @@ public class ProximityHandoffStrategyTest {
@Test
public void testTwoZones() {
Cluster cluster = VoldemortTestConstants.getEightNodeClusterWithZones();
ProximityHandoffStrategy handoffStrategy = new ProximityHandoffStrategy(cluster);

List<Node> zone0Nodes = Lists.newArrayList();
List<Node> zone1Nodes = Lists.newArrayList();
for(Node node: cluster.getNodes()) {
Expand All @@ -29,29 +27,50 @@ public void testTwoZones() {
}
}

// Try with node from zone 0
for(Node zone0Node: zone0Nodes) {
List<Node> prefList = handoffStrategy.routeHint(zone0Node);
ProximityHandoffStrategy handoffStrategy = new ProximityHandoffStrategy(cluster, 0);

for(Node node: cluster.getNodes()) {
List<Node> prefList = handoffStrategy.routeHint(node);
for(int i = 0; i < prefList.size(); i++) {
if(i < zone0Nodes.size()) {
assertEquals(prefList.get(i).getZoneId(), 0);
if(node.getZoneId() == 0) {
if(i < zone0Nodes.size() - 1) {
assertEquals(prefList.get(i).getZoneId(), 0);
} else {
assertEquals(prefList.get(i).getZoneId(), 1);
}
} else {
assertEquals(prefList.get(i).getZoneId(), 1);
if(i < zone0Nodes.size()) {
assertEquals(prefList.get(i).getZoneId(), 0);
} else {
assertEquals(prefList.get(i).getZoneId(), 1);
}
}

}
}

// Try with node from zone 1
for(Node zone1Node: zone1Nodes) {
List<Node> prefList = handoffStrategy.routeHint(zone1Node);
handoffStrategy = new ProximityHandoffStrategy(cluster, 1);

for(Node node: cluster.getNodes()) {
List<Node> prefList = handoffStrategy.routeHint(node);
for(int i = 0; i < prefList.size(); i++) {
if(i < zone1Nodes.size()) {
assertEquals(prefList.get(i).getZoneId(), 1);
if(node.getZoneId() == 0) {
if(i < zone0Nodes.size()) {
assertEquals(prefList.get(i).getZoneId(), 1);
} else {
assertEquals(prefList.get(i).getZoneId(), 0);
}
} else {
assertEquals(prefList.get(i).getZoneId(), 0);
if(i < zone0Nodes.size() - 1) {
assertEquals(prefList.get(i).getZoneId(), 1);
} else {
assertEquals(prefList.get(i).getZoneId(), 0);
}
}

}
}

}

}

0 comments on commit d5062d6

Please sign in to comment.