From d5062d641342cd1b7b4a639248b11838f0f3e644 Mon Sep 17 00:00:00 2001 From: Roshan Sumbaly Date: Thu, 2 Dec 2010 19:48:39 -0800 Subject: [PATCH] a) Modified proximity based handoff strategy to use client zones instead of origin node's zone to reorder the list b) Made early termination of pusher job better --- .classpath | 4 +- .../voldemort/server/VoldemortConfig.java | 2 +- .../slop/StreamingSlopPusherJob.java | 71 ++++++++----------- .../HintedHandoffStrategyFactory.java | 2 +- .../strategy/ProximityHandoffStrategy.java | 35 +++++---- .../ProximityHandoffStrategyTest.java | 47 ++++++++---- 6 files changed, 89 insertions(+), 72 deletions(-) diff --git a/.classpath b/.classpath index 7f9a3d6225..599a692919 100644 --- a/.classpath +++ b/.classpath @@ -51,6 +51,6 @@ - - + + diff --git a/src/java/voldemort/server/VoldemortConfig.java b/src/java/voldemort/server/VoldemortConfig.java index 9b87012f21..93cfe7695b 100644 --- a/src/java/voldemort/server/VoldemortConfig.java +++ b/src/java/voldemort/server/VoldemortConfig.java @@ -265,7 +265,7 @@ public VoldemortConfig(Props props) { this.slopFrequencyMs = props.getLong("slop.frequency.ms", 5 * 60 * 1000); this.slopBatchSize = props.getInt("slop.batch.size", 100); this.pusherType = props.getString("pusher.type", StreamingSlopPusherJob.TYPE_NAME); - this.slopZonesDownToTerminate = props.getInt("slop.zone.terminate", 1); + this.slopZonesDownToTerminate = props.getInt("slop.zones.terminate", 0); this.schedulerThreads = props.getInt("scheduler.threads", 6); diff --git a/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java b/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java index 0985f21aa5..ba236a24fe 100644 --- a/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java +++ b/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java @@ -21,6 +21,7 @@ import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.cluster.Cluster; import voldemort.cluster.Node; +import voldemort.cluster.Zone; import voldemort.cluster.failuredetector.FailureDetector; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; @@ -92,35 +93,10 @@ public Thread newThread(Runnable r) { this.attemptedByNode = new ConcurrentHashMap(cluster.getNumberOfNodes()); this.succeededByNode = new ConcurrentHashMap(cluster.getNumberOfNodes()); - /** - * Required for early termination - Since the slop pusher job runs - * periodically it does full bdb sweeps resulting in random disk seeks - * (since bdb still doesn't support a cursor which reads k/vs in disk - * order). This can definitely slow the incoming requests. So to avoid - * this we include a new strategy which terminates the sweep when we - * find that all nodes in 'n' zones are down ( where n = - * getSlopZonesDownToTerminate ). We keep a mapping of zone ids to their - * node ids - */ this.zoneMapping = Maps.newHashMap(); } - private boolean checkZonesDown() { - int zonesDown = 0; - for(Integer zoneId: zoneMapping.keySet()) { - if(zoneMapping.get(zoneId).size() == 0) - zonesDown++; - } - - if(voldemortConfig.getSlopZonesDownToTerminate() > 0 - && voldemortConfig.getSlopZonesDownToTerminate() <= zoneMapping.size() - && zonesDown >= voldemortConfig.getSlopZonesDownToTerminate()) { - return true; - } - return false; - } - public void run() { // don't try to run slop pusher job when rebalancing @@ -147,15 +123,36 @@ public void run() { .setMaxConnectionsPerNode(1)); } - // Populating the zone mapping - zoneMapping.clear(); - for(Node n: cluster.getNodes()) { - Set nodes = zoneMapping.get(n.getZoneId()); - if(nodes == null) { - nodes = Sets.newHashSet(); - zoneMapping.put(n.getZoneId(), nodes); + if(voldemortConfig.getSlopZonesDownToTerminate() > 0) { + // Populating the zone mapping for early termination + zoneMapping.clear(); + for(Node n: cluster.getNodes()) { + if(failureDetector.isAvailable(n)) { + Set nodes = zoneMapping.get(n.getZoneId()); + if(nodes == null) { + nodes = Sets.newHashSet(); + zoneMapping.put(n.getZoneId(), nodes); + } + nodes.add(n.getId()); + } + } + + // Check how many zones are down + int zonesDown = 0; + for(Zone zone: cluster.getZones()) { + if(zoneMapping.get(zone.getId()) == null + || zoneMapping.get(zone.getId()).size() == 0) + zonesDown++; + } + + // Terminate early + if(voldemortConfig.getSlopZonesDownToTerminate() <= zoneMapping.size() + && zonesDown >= voldemortConfig.getSlopZonesDownToTerminate()) { + logger.info("Completed streaming slop pusher job at " + startTime + + " early because " + zonesDown + " zones are down"); + cleanUp(); + return; } - nodes.add(n.getId()); } // Clearing the statistics @@ -190,7 +187,6 @@ public void run() { + " and store " + versioned.getValue().getStoreName()); if(failureDetector.isAvailable(node)) { - zoneMapping.get(node.getZoneId()).add(node.getId()); SynchronousQueue> slopQueue = slopQueues.get(nodeId); if(slopQueue == null) { // No previous slop queue, add one @@ -205,13 +201,6 @@ public void run() { TimeUnit.MILLISECONDS); readThrottler.maybeThrottle(nBytesRead(keyAndVal)); } else { - zoneMapping.get(node.getZoneId()).remove(node.getId()); - - // Check if we can terminate early - if(checkZonesDown()) { - terminatedEarly = true; - return; - } logger.trace(node + " declared down, won't push slop"); } } catch(RejectedExecutionException e) { diff --git a/src/java/voldemort/store/slop/strategy/HintedHandoffStrategyFactory.java b/src/java/voldemort/store/slop/strategy/HintedHandoffStrategyFactory.java index ae657274e7..9e4a9fe4bf 100644 --- a/src/java/voldemort/store/slop/strategy/HintedHandoffStrategyFactory.java +++ b/src/java/voldemort/store/slop/strategy/HintedHandoffStrategyFactory.java @@ -44,7 +44,7 @@ public HintedHandoffStrategy updateHintedHandoffStrategy(StoreDefinition storeDe } else if(HintedHandoffStrategyType.PROXIMITY_STRATEGY.toDisplay() .compareTo(storeDef.getHintedHandoffStrategyType() .toDisplay()) == 0) { - return new ProximityHandoffStrategy(cluster); + return new ProximityHandoffStrategy(cluster, clientZoneId); } else { throw new VoldemortException("HintedHandoffStrategyType:" + storeDef.getHintedHandoffStrategyType() diff --git a/src/java/voldemort/store/slop/strategy/ProximityHandoffStrategy.java b/src/java/voldemort/store/slop/strategy/ProximityHandoffStrategy.java index fab4c483ea..51b607b440 100644 --- a/src/java/voldemort/store/slop/strategy/ProximityHandoffStrategy.java +++ b/src/java/voldemort/store/slop/strategy/ProximityHandoffStrategy.java @@ -16,6 +16,7 @@ package voldemort.store.slop.strategy; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -31,8 +32,9 @@ */ public class ProximityHandoffStrategy implements HintedHandoffStrategy { - private final Map> zoneMapping; private final Cluster cluster; + private final int clientZoneId; + private Map> zoneMapping; /** * Constructor which makes zone based mapping @@ -40,9 +42,12 @@ public class ProximityHandoffStrategy implements HintedHandoffStrategy { * @param cluster The cluster * @param clientZoneId Client zone id */ - public ProximityHandoffStrategy(Cluster cluster) { + public ProximityHandoffStrategy(Cluster cluster, int clientZoneId) { this.cluster = cluster; - this.zoneMapping = Maps.newHashMap(); + this.clientZoneId = clientZoneId; + + // Generate a mapping from zone to nodes + zoneMapping = Maps.newHashMap(); for(Node node: cluster.getNodes()) { List nodes = zoneMapping.get(node.getZoneId()); if(nodes == null) { @@ -51,20 +56,24 @@ public ProximityHandoffStrategy(Cluster cluster) { } nodes.add(node); } + } public List routeHint(Node origin) { - List prefList = Lists.newArrayList(); - prefList.addAll(zoneMapping.get(origin.getZoneId())); - Collections.shuffle(prefList); - if(cluster.getNumberOfZones() > 1) { - for(Integer zoneId: cluster.getZoneById(origin.getZoneId()).getProximityList()) { - List nodesInZone = zoneMapping.get(zoneId); - Collections.shuffle(nodesInZone); - prefList.addAll(nodesInZone); - } + List proximityList = new ArrayList(); + + // Add the client zone id + Collections.shuffle(zoneMapping.get(clientZoneId)); + proximityList.addAll(zoneMapping.get(clientZoneId)); + + for(Integer zoneId: cluster.getZoneById(clientZoneId).getProximityList()) { + Collections.shuffle(zoneMapping.get(zoneId)); + proximityList.addAll(zoneMapping.get(zoneId)); } - return prefList; + + // Remove the origin node + proximityList.remove(origin); + return proximityList; } @Override diff --git a/test/unit/voldemort/store/slop/strategy/ProximityHandoffStrategyTest.java b/test/unit/voldemort/store/slop/strategy/ProximityHandoffStrategyTest.java index 348fb08c2a..82918abb45 100644 --- a/test/unit/voldemort/store/slop/strategy/ProximityHandoffStrategyTest.java +++ b/test/unit/voldemort/store/slop/strategy/ProximityHandoffStrategyTest.java @@ -17,8 +17,6 @@ public class ProximityHandoffStrategyTest { @Test public void testTwoZones() { Cluster cluster = VoldemortTestConstants.getEightNodeClusterWithZones(); - ProximityHandoffStrategy handoffStrategy = new ProximityHandoffStrategy(cluster); - List zone0Nodes = Lists.newArrayList(); List zone1Nodes = Lists.newArrayList(); for(Node node: cluster.getNodes()) { @@ -29,29 +27,50 @@ public void testTwoZones() { } } - // Try with node from zone 0 - for(Node zone0Node: zone0Nodes) { - List prefList = handoffStrategy.routeHint(zone0Node); + ProximityHandoffStrategy handoffStrategy = new ProximityHandoffStrategy(cluster, 0); + + for(Node node: cluster.getNodes()) { + List prefList = handoffStrategy.routeHint(node); for(int i = 0; i < prefList.size(); i++) { - if(i < zone0Nodes.size()) { - assertEquals(prefList.get(i).getZoneId(), 0); + if(node.getZoneId() == 0) { + if(i < zone0Nodes.size() - 1) { + assertEquals(prefList.get(i).getZoneId(), 0); + } else { + assertEquals(prefList.get(i).getZoneId(), 1); + } } else { - assertEquals(prefList.get(i).getZoneId(), 1); + if(i < zone0Nodes.size()) { + assertEquals(prefList.get(i).getZoneId(), 0); + } else { + assertEquals(prefList.get(i).getZoneId(), 1); + } } + } } - // Try with node from zone 1 - for(Node zone1Node: zone1Nodes) { - List prefList = handoffStrategy.routeHint(zone1Node); + handoffStrategy = new ProximityHandoffStrategy(cluster, 1); + + for(Node node: cluster.getNodes()) { + List prefList = handoffStrategy.routeHint(node); for(int i = 0; i < prefList.size(); i++) { - if(i < zone1Nodes.size()) { - assertEquals(prefList.get(i).getZoneId(), 1); + if(node.getZoneId() == 0) { + if(i < zone0Nodes.size()) { + assertEquals(prefList.get(i).getZoneId(), 1); + } else { + assertEquals(prefList.get(i).getZoneId(), 0); + } } else { - assertEquals(prefList.get(i).getZoneId(), 0); + if(i < zone0Nodes.size() - 1) { + assertEquals(prefList.get(i).getZoneId(), 1); + } else { + assertEquals(prefList.get(i).getZoneId(), 0); + } } + } } + } }