Permalink
Browse files

Created a way to automatically generate targetCluster.xml.

  • Loading branch information...
1 parent 09037a9 commit e601a7caeb137555514dccfb9a49389cdd777eb1 @afeinberg afeinberg committed Jan 22, 2010
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+#
+# Copyright 2008-2010 LinkedIn, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+base_dir=$(dirname $0)/..
+
+$base_dir/bin/run-class.sh voldemort.ClusterViewer $@
@@ -1,29 +1,209 @@
package voldemort.client.rebalance;
-import java.io.IOException;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import voldemort.ClusterViewer;
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.store.StoreDefinition;
+import voldemort.utils.CmdUtils;
+import voldemort.utils.Utils;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+import java.io.*;
+import java.util.List;
+import java.util.Set;
/**
* @author afeinberg
*/
public class RebalanceClusterBuilder {
-
- private final String clusterXmlFile;
- private final String storesXmlFile;
-
- public RebalanceClusterBuilder(String clusterXmlFile, String storesXmlFile) {
- this.clusterXmlFile = clusterXmlFile;
- this.storesXmlFile = storesXmlFile;
+ private final Cluster cluster;
+ private final List<StoreDefinition> stores;
+ private final int maxRemappedReplicas;
+ private final int minNumPartitions;
+ private final int desiredNumPartitions;
+
+ private RebalanceClusterBuilder(Cluster cluster,
+ List<StoreDefinition> stores,
+ int maxRemappedReplicas,
+ int minNumPartitions,
+ int desiredNumPartitions) {
+ this.cluster = cluster;
+ this.stores = stores;
+ this.maxRemappedReplicas = maxRemappedReplicas;
+ this.minNumPartitions = minNumPartitions;
+ this.desiredNumPartitions = desiredNumPartitions;
}
- public void build(String targetClusterXml,
+ public static RebalanceClusterBuilder create(String clusterXmlFile,
+ String storesXmlFile,
+ int maxRemappedReplicas,
+ int minNumPartitions,
+ int desiredNumPartitions) throws IOException {
+ Cluster cluster = new ClusterMapper().readCluster(new BufferedReader(new FileReader(clusterXmlFile)));
+ List<StoreDefinition> stores = new StoreDefinitionsMapper().readStoreList(new BufferedReader(new FileReader(storesXmlFile)));
+
+ if (desiredNumPartitions < minNumPartitions)
+ desiredNumPartitions = cluster.getNumberOfPartitions() / (cluster.getNumberOfNodes() + 1);
+
+ if (maxRemappedReplicas < 0)
+ maxRemappedReplicas = cluster.getNumberOfPartitions() / 2;
+
+ return new RebalanceClusterBuilder(cluster,
+ stores,
+ maxRemappedReplicas,
+ minNumPartitions,
+ desiredNumPartitions);
+ }
+
+ public void build(String targetClusterXmlFile,
String host,
- String httpPort,
+ int httpPort,
int socketPort,
int adminPort) throws IOException {
+ // First find the store with the highest N
+ StoreDefinition store = stores.get(0);
+ for (int i=1; i < stores.size(); i++) {
+ StoreDefinition curr = stores.get(i);
+ if (store.getReplicationFactor() > curr.getReplicationFactor())
+ store = curr;
+ }
+
+ RebalanceClusterTool clusterTool = new RebalanceClusterTool(cluster, store);
+ ClusterViewer clusterViewer = new ClusterViewer(cluster, store);
+
+ System.out.println("Original layout: ");
+ clusterViewer.viewMasterToReplica();
+
+ Node template = new Node(cluster.getNumberOfNodes(),
+ host,
+ httpPort,
+ socketPort,
+ adminPort,
+ ImmutableList.<Integer>of());
+
+ System.out.println("Inserting " + template + "\n");
+ System.out.println("Configuration " + Joiner.on(" ")
+ .withKeyValueSeparator("=")
+ .join(ImmutableMap.<String, Integer>builder()
+ .put("maxRemappedReplicas", maxRemappedReplicas)
+ .put("minNumPartitions", minNumPartitions)
+ .put("desiredNumPartitions", desiredNumPartitions)
+ .build()));
+
+ Cluster targetCluster = clusterTool.insertNode(template,
+ minNumPartitions,
+ desiredNumPartitions,
+ maxRemappedReplicas);
+ if (targetCluster == null)
+ Utils.croak("Unable to insert " + template + " into " + cluster);
+ System.out.println("Created target cluster layout: ");
+ clusterViewer.viewMasterToReplica(targetCluster);
+ clusterViewer.compareToCluster(targetCluster);
+
+ String clusterXmlString = new ClusterMapper().writeCluster(targetCluster);
+ if (targetClusterXmlFile == null) {
+ System.err.println("Warning: target-cluster not specified, printing to STDOUT instead\n");
+ System.out.println(clusterXmlString);
+ } else {
+ BufferedWriter out = new BufferedWriter(new FileWriter(targetClusterXmlFile));
+ try {
+ out.write(clusterXmlString);
+ } finally {
+ out.close();
+ }
+ System.out.println("Wrote target cluster.xml to " + targetClusterXmlFile);
+ }
}
public static void main(String [] args) throws IOException {
+ OptionParser parser = new OptionParser();
+
+ parser.accepts("help", "print usage information");
+ parser.accepts("stores", "[REQUIRED] path to the stores xml config file")
+ .withRequiredArg()
+ .describedAs("stores.xml");
+ parser.accepts("cluster", "[REQUIRED] path to the ORIGINAL cluster xml config file")
+ .withRequiredArg()
+ .describedAs("cluster.xml");
+ parser.accepts("target-cluster", "path to the TARGET cluster xml config file")
+ .withRequiredArg()
+ .describedAs("targetCluster.xml");
+
+ parser.accepts("host", "[REQUIRED] new node's host name")
+ .withRequiredArg()
+ .describedAs("host-name");
+
+ parser.accepts("http-port", "[REQUIRED] new node's http port")
+ .withRequiredArg()
+ .ofType(Integer.class)
+ .describedAs("http-port");
+ parser.accepts("socket-port", "[REQUIRED] new node's socket port")
+ .withRequiredArg()
+ .ofType(Integer.class)
+ .describedAs("socket-port");
+ parser.accepts("admin-port", "new node's admin port")
+ .withRequiredArg()
+ .ofType(Integer.class)
+ .describedAs("admin-port");
+
+ parser.accepts("max-remaps", "Maximum number of replication mappings that may change")
+ .withRequiredArg()
+ .ofType(Integer.class);
+ parser.accepts("desired-partitions", "Desired number of partitions per node")
+ .withRequiredArg()
+ .ofType(Integer.class);
+ parser.accepts("min-partitions", "Minimum number of partitions per node")
+ .withRequiredArg()
+ .ofType(Integer.class);
+
+ OptionSet options = parser.parse(args);
+
+ if (options.has("help")) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
+
+ Set<String> missing = CmdUtils.missing(options,
+ "stores",
+ "cluster",
+ "host",
+ "http-port",
+ "socket-port");
+ if (missing.size() > 0) {
+ System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
+ parser.printHelpOn(System.err);
+ System.exit(1);
+ }
+
+ String storesXmlFile = (String) options.valueOf("stores");
+ String clusterXmlFile = (String) options.valueOf("cluster");
+ String targetClusterXmlFile = (String) options.valueOf("target-cluster");
+
+ int maxRemappedReplicas = CmdUtils.valueOf(options, "max-remaps", -1);
+ int minNumPartitions = CmdUtils.valueOf(options, "min-partitions", 1);
+ int desiredNumPartitions = CmdUtils.valueOf(options, "desired-partitions", -1);
+
+ String host = (String) options.valueOf("host");
+ int httpPort = (Integer) options.valueOf("http-port");
+ int socketPort = (Integer) options.valueOf("socket-port");
+ int adminPort = CmdUtils.valueOf(options, "admin-port", socketPort + 1);
+ RebalanceClusterBuilder rebalanceClusterBuilder = create(clusterXmlFile,
+ storesXmlFile,
+ maxRemappedReplicas,
+ minNumPartitions,
+ desiredNumPartitions);
+ rebalanceClusterBuilder.build(targetClusterXmlFile,
+ host,
+ httpPort,
+ socketPort,
+ adminPort);
}
}
@@ -79,10 +79,129 @@ public RebalanceClusterTool(Cluster cluster, StoreDefinition storeDefinition) {
* @return If successful, a new cluster containing the template node; otherwise null.
*/
public Cluster insertNode(Node template,
- int minNumberOfPartitions,
- int desiredNumberOfPartitions) {
- // TODO: complete implementation
- return null;
+ int minPartitions,
+ int desiredParitions,
+ int maxRemap) {
+ List<Node> nodes = new ArrayList<Node>();
+ nodes.addAll(cluster.getNodes());
+ nodes.add(template);
+ Cluster templateCluster = new Cluster(cluster.getName(), nodes);
+ Cluster targetCluster = null;
+ boolean foundBest = false;
+
+ for (int i = desiredParitions; i >= minPartitions && !foundBest; i--) {
+ System.out.println("Trying to move " + i + " partitions to the new node");
+ Cluster candidateCluster = createTargetCluster(templateCluster,
+ i,
+ maxRemap,
+ ImmutableSet.<Integer>of(),
+ masterToReplicas.keySet());
+
+ // If we were able to successfully move partitions to the new node
+ if (candidateCluster.getNodeById(template.getId()).getNumberOfPartitions() > template.getNumberOfPartitions()) {
+ targetCluster = candidateCluster;
+ System.out.println("Success moving " + i + " partitions");
+ foundBest = isGoodEnough(candidateCluster, i);
+ if (foundBest) {
+ System.out.println("Moving " + i + " partitions " + "found to be \"optimal\"");
+ } else
+ System.out.println("Correct but suboptimal cluster, trying to move a smaller number of partitions");
+ }
+ }
+
+ return targetCluster;
+ }
+
+ private Cluster createTargetCluster(Cluster candidate,
+ int minPartitions,
+ int maxRemap,
+ Set<Integer> partitionsMoved,
+ Set<Integer> allPartitions) {
+ // This is pretty much a *brute force* method
+
+ // Base case: if we've tried all the partitions, return
+ Set<Integer> partitionsNotMoved = Sets.difference(allPartitions, partitionsMoved);
+ if (partitionsNotMoved.isEmpty())
+ return candidate;
+
+ // If the candidate is already good enough, return
+ if (isGoodEnough(candidate, minPartitions))
+ return candidate;
+
+ // Otherwise try the highest numbered partition that we haven't yet tried.
+ for (int i=candidate.getNumberOfPartitions() - 1; i >= 0; i--) {
+ if (!partitionsMoved.contains(i)) {
+ Cluster attempt = moveToLastNode(candidate, i, maxRemap);
+ if (attempt != null) {
+ // If that succeeds, recur with partitionsMoved now containing the new partition
+ return createTargetCluster(attempt,
+ minPartitions,
+ maxRemap,
+ Sets.union(partitionsMoved, ImmutableSet.of(i)),
+ allPartitions);
+ }
+ }
+ }
+
+ return candidate;
+ }
+
+ private Cluster moveToLastNode(Cluster candidate,
+ int partition,
+ int maxRemap) {
+ Node lastNode = candidate.getNodeById(candidate.getNumberOfNodes() - 1);
+ if (lastNode.getPartitionIds().contains(partition))
+ return null;
+
+ List<Node> nodes = new ArrayList<Node>();
+ for (int i = 0; i < candidate.getNumberOfNodes() - 1; i++) {
+ Node currNode = candidate.getNodeById(i);
+ if (currNode.getPartitionIds().contains(partition)) {
+ List<Integer> currNodePartitions = new ArrayList<Integer>();
+ for (int oldPartition: currNode.getPartitionIds()) {
+ if (oldPartition != partition)
+ currNodePartitions.add(oldPartition);
+ }
+ nodes.add(new Node(i,
+ currNode.getHost(),
+ currNode.getHttpPort(),
+ currNode.getSocketPort(),
+ currNode.getAdminPort(),
+ currNodePartitions));
+ } else
+ nodes.add(currNode);
+ }
+
+ List<Integer> lastNodePartitions = new ArrayList<Integer>();
+ lastNodePartitions.addAll(lastNode.getPartitionIds());
+ lastNodePartitions.add(partition);
+ Collections.sort(lastNodePartitions);
+ nodes.add(new Node(lastNode.getId(),
+ lastNode.getHost(),
+ lastNode.getHttpPort(),
+ lastNode.getSocketPort(),
+ lastNode.getAdminPort(),
+ lastNodePartitions));
+
+ Cluster attempt = new Cluster(candidate.getName(), nodes);
+ if (hasMultipleCopies(attempt) || getRemappedReplicaCount(attempt) > maxRemap)
+ return null;
+
+ return attempt;
+ }
+
+ public boolean isGoodEnough(Cluster candidate, int minPartitions) {
+ Node lastNode = candidate.getNodeById(candidate.getNumberOfNodes()-1);
+ if (lastNode.getNumberOfPartitions() != minPartitions)
+ return false;
+
+ for (int i=0; i < candidate.getNumberOfNodes() - 1; i++) {
+ Node curr = candidate.getNodeById(i);
+ if (curr.getNumberOfPartitions() < minPartitions)
+ return false;
+ }
+
+ return true;
}
/**

0 comments on commit e601a7c

Please sign in to comment.