Skip to content
This repository
Browse code

allow StreamingSlopPusherJob to pick up cluster.xml change dynamicall…

…y and a unit test
  • Loading branch information...
commit b44fb9475e6419c4f8a53b5656cdc92b03ec5b92 1 parent 56c71c6
Lei Gao leigao authored
44 src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
@@ -57,16 +57,17 @@
57 57 private final MetadataStore metadataStore;
58 58 private final StoreRepository storeRepo;
59 59 private final FailureDetector failureDetector;
60   - private final ConcurrentMap<Integer, SynchronousQueue<Versioned<Slop>>> slopQueues;
61   - private final ExecutorService consumerExecutor;
  60 + private ConcurrentMap<Integer, SynchronousQueue<Versioned<Slop>>> slopQueues;
  61 + private ExecutorService consumerExecutor;
62 62 private final EventThrottler readThrottler;
63 63 private AdminClient adminClient;
64   - private final Cluster cluster;
  64 + private Cluster cluster;
65 65
66 66 private final List<Future> consumerResults;
67 67 private final VoldemortConfig voldemortConfig;
68 68 private final Map<Integer, Set<Integer>> zoneMapping;
69   - private final ConcurrentHashMap<Integer, Long> attemptedByNode, succeededByNode;
  69 + private ConcurrentHashMap<Integer, Long> attemptedByNode;
  70 + private ConcurrentHashMap<Integer, Long> succeededByNode;
70 71 private final Semaphore repairPermits;
71 72
72 73 public StreamingSlopPusherJob(StoreRepository storeRepo,
@@ -79,30 +80,15 @@ public StreamingSlopPusherJob(StoreRepository storeRepo,
79 80 this.failureDetector = failureDetector;
80 81 this.voldemortConfig = voldemortConfig;
81 82 this.repairPermits = Utils.notNull(repairPermits);
82   -
83   - this.cluster = metadataStore.getCluster();
84   - this.slopQueues = new ConcurrentHashMap<Integer, SynchronousQueue<Versioned<Slop>>>(cluster.getNumberOfNodes());
85   - this.consumerExecutor = Executors.newFixedThreadPool(cluster.getNumberOfNodes(),
86   - new ThreadFactory() {
87   -
88   - public Thread newThread(Runnable r) {
89   - Thread thread = new Thread(r);
90   - thread.setName("slop-pusher");
91   - return thread;
92   - }
93   - });
94   -
95 83 this.readThrottler = new EventThrottler(voldemortConfig.getSlopMaxReadBytesPerSec());
96 84 this.adminClient = null;
97 85 this.consumerResults = Lists.newArrayList();
98   - this.attemptedByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
99   - this.succeededByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
100   -
101 86 this.zoneMapping = Maps.newHashMap();
102   -
103 87 }
104 88
105 89 public void run() {
  90 + // load the metadata before each run, in case the cluster is changed
  91 + loadMetadata();
106 92
107 93 // don't try to run slop pusher job when rebalancing
108 94 if(metadataStore.getServerState()
@@ -277,6 +263,22 @@ public void run() {
277 263
278 264 }
279 265
  266 + private void loadMetadata() {
  267 + this.cluster = metadataStore.getCluster();
  268 + this.slopQueues = new ConcurrentHashMap<Integer, SynchronousQueue<Versioned<Slop>>>(cluster.getNumberOfNodes());
  269 + this.consumerExecutor = Executors.newFixedThreadPool(cluster.getNumberOfNodes(),
  270 + new ThreadFactory() {
  271 +
  272 + public Thread newThread(Runnable r) {
  273 + Thread thread = new Thread(r);
  274 + thread.setName("slop-pusher");
  275 + return thread;
  276 + }
  277 + });
  278 + this.attemptedByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
  279 + this.succeededByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
  280 + }
  281 +
280 282 private void stopAdminClient() {
281 283 if(adminClient != null) {
282 284 adminClient.stop();
42 test/common/voldemort/ServerTestUtils.java
@@ -25,9 +25,11 @@
25 25 import java.util.Collections;
26 26 import java.util.Date;
27 27 import java.util.HashMap;
  28 +import java.util.HashSet;
28 29 import java.util.LinkedList;
29 30 import java.util.List;
30 31 import java.util.Properties;
  32 +import java.util.Set;
31 33
32 34 import org.apache.commons.httpclient.HttpClient;
33 35 import org.apache.commons.io.FileUtils;
@@ -282,6 +284,46 @@ public static Cluster getLocalCluster(int numberOfNodes, int[] ports, int[][] pa
282 284 }
283 285
284 286 /**
  287 + * Update a cluster by replacing the specified server with a new host, i.e.
  288 + * new ports since they are all localhost
  289 + *
  290 + * @param original The original cluster to be updated
  291 + * @param serverIds The ids of the server to be replaced with new hosts
  292 + * @return updated cluster
  293 + */
  294 + public static Cluster updateClusterWithNewHost(Cluster original, int... serverIds) {
  295 + int highestPortInuse = 0;
  296 +
  297 + for(Node node: original.getNodes()) {
  298 + int nodeMaxPort = 0;
  299 + nodeMaxPort = Math.max(nodeMaxPort, node.getAdminPort());
  300 + nodeMaxPort = Math.max(nodeMaxPort, node.getHttpPort());
  301 + nodeMaxPort = Math.max(nodeMaxPort, node.getSocketPort());
  302 + highestPortInuse = Math.max(highestPortInuse, nodeMaxPort);
  303 + }
  304 +
  305 + Set<Integer> newNodesSet = new HashSet<Integer>(serverIds.length);
  306 + for(int id: serverIds) {
  307 + newNodesSet.add(id);
  308 + }
  309 +
  310 + List<Node> newNodeList = new ArrayList<Node>(serverIds.length);
  311 + for(Node node: original.getNodes()) {
  312 + if(newNodesSet.contains(node.getId())) {
  313 + node = new Node(node.getId(),
  314 + "localhost",
  315 + ++highestPortInuse,
  316 + ++highestPortInuse,
  317 + ++highestPortInuse,
  318 + node.getPartitionIds());
  319 + }
  320 + newNodeList.add(node);
  321 + }
  322 +
  323 + return new Cluster(original.getName(), newNodeList);
  324 + }
  325 +
  326 + /**
285 327 * Returns a list of zones with their proximity list being in increasing
286 328 * order
287 329 *
126 test/unit/voldemort/scheduled/StreamingSlopPusherTest.java
@@ -509,4 +509,130 @@ public void testNormalPushBothWays() throws InterruptedException, IOException {
509 509
510 510 stopServers(0, 1);
511 511 }
  512 +
  513 + @Test
  514 + public void testServerReplacementWithoutBounce() throws IOException, InterruptedException {
  515 + startServers(0, 2);
  516 +
  517 + // Put into slop store 0
  518 + StorageEngine<ByteArray, Slop, byte[]> slopStoreNode0 = getVoldemortServer(0).getStoreRepository()
  519 + .getSlopStore()
  520 + .asSlopStore();
  521 +
  522 + // Generate slops for 1
  523 + final List<Versioned<Slop>> entrySet1 = ServerTestUtils.createRandomSlops(1,
  524 + 50,
  525 + "test-replication-memory",
  526 + "users",
  527 + "test-replication-persistent",
  528 + "test-readrepair-memory",
  529 + "test-consistent",
  530 + "test-consistent-with-pref-list");
  531 + // Generate slops for 2
  532 + final List<Versioned<Slop>> entrySet2 = ServerTestUtils.createRandomSlops(2,
  533 + 50,
  534 + "test-replication-memory",
  535 + "users",
  536 + "test-replication-persistent",
  537 + "test-readrepair-memory",
  538 + "test-consistent",
  539 + "test-consistent-with-pref-list");
  540 +
  541 + populateSlops(0, slopStoreNode0, entrySet1, entrySet2);
  542 +
  543 + StreamingSlopPusherJob pusher = new StreamingSlopPusherJob(getVoldemortServer(0).getStoreRepository(),
  544 + getVoldemortServer(0).getMetadataStore(),
  545 + new BannagePeriodFailureDetector(new FailureDetectorConfig().setNodes(cluster.getNodes())
  546 + .setStoreVerifier(new ServerStoreVerifier(socketStoreFactory,
  547 + metadataStore,
  548 + configs[0]))),
  549 + configs[0],
  550 + new Semaphore(1));
  551 +
  552 + pusher.run();
  553 +
  554 + // Give some time for the slops to go over
  555 + Thread.sleep(10000);
  556 +
  557 + // Now check if the slops went through and also got deleted
  558 + Iterator<Versioned<Slop>> entryIterator = entrySet2.listIterator();
  559 + while(entryIterator.hasNext()) {
  560 + Versioned<Slop> versionedSlop = entryIterator.next();
  561 + Slop nextSlop = versionedSlop.getValue();
  562 + StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(2).getStoreRepository()
  563 + .getStorageEngine(nextSlop.getStoreName());
  564 + if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
  565 + assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
  566 + null).size());
  567 + assertEquals("entry value should match",
  568 + new String(nextSlop.getValue()),
  569 + new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
  570 + } else if(nextSlop.getOperation().equals(Slop.Operation.DELETE)) {
  571 + assertEquals("entry value should match", 0, store.get(nextSlop.getKey(), null)
  572 + .size());
  573 + }
  574 +
  575 + // did it get deleted correctly
  576 + assertEquals("slop should have gone", 0, slopStoreNode0.get(nextSlop.makeKey(), null)
  577 + .size());
  578 + }
  579 +
  580 + entryIterator = entrySet1.listIterator();
  581 + while(entryIterator.hasNext()) {
  582 + Versioned<Slop> versionedSlop = entryIterator.next();
  583 + Slop nextSlop = versionedSlop.getValue();
  584 + // did it get deleted correctly
  585 + assertNotSame("slop should be there", 0, slopStoreNode0.get(nextSlop.makeKey(), null)
  586 + .size());
  587 + }
  588 +
  589 + // Check counts
  590 + SlopStorageEngine slopEngine = getVoldemortServer(0).getStoreRepository().getSlopStore();
  591 + assertEquals(slopEngine.getOutstandingTotal(), 50);
  592 + assertEquals(slopEngine.getOutstandingByNode().get(1), new Long(50));
  593 + assertEquals(slopEngine.getOutstandingByNode().get(2), new Long(0));
  594 +
  595 + // now replace server 1 with a new host and start it
  596 + cluster = ServerTestUtils.updateClusterWithNewHost(cluster, 1);
  597 + startServers(1);
  598 +
  599 + // update the meatadata store with the new cluster on node 0 and 2 (the
  600 + // two servers that are running)
  601 + servers[0].getMetadataStore().put(MetadataStore.CLUSTER_KEY, cluster);
  602 + servers[2].getMetadataStore().put(MetadataStore.CLUSTER_KEY, cluster);
  603 +
  604 + // Give some time for the pusher job to figure out that server1 is up
  605 + Thread.sleep(35000);
  606 +
  607 + // start the pusher job again
  608 + pusher.run();
  609 +
  610 + // Give some time for the slops to go over
  611 + Thread.sleep(10000);
  612 +
  613 + // make sure the slot for server 1 is pushed to the new host
  614 + // Now check if the slops went through and also got deleted
  615 + entryIterator = entrySet1.listIterator();
  616 + while(entryIterator.hasNext()) {
  617 + Versioned<Slop> versionedSlop = entryIterator.next();
  618 + Slop nextSlop = versionedSlop.getValue();
  619 + StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(1).getStoreRepository()
  620 + .getStorageEngine(nextSlop.getStoreName());
  621 + if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
  622 + assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
  623 + null).size());
  624 + assertEquals("entry value should match",
  625 + new String(nextSlop.getValue()),
  626 + new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
  627 + } else if(nextSlop.getOperation().equals(Slop.Operation.DELETE)) {
  628 + assertEquals("entry value should match", 0, store.get(nextSlop.getKey(), null)
  629 + .size());
  630 + }
  631 +
  632 + // did it get deleted correctly
  633 + assertEquals("slop should have gone", 0, slopStoreNode0.get(nextSlop.makeKey(), null)
  634 + .size());
  635 + }
  636 +
  637 + }
512 638 }

0 comments on commit b44fb94

Please sign in to comment.
Something went wrong with that request. Please try again.