Skip to content

Commit

Permalink
WHIRR-282. Set number of Hadoop slots based on hardware (tomwhite via…
Browse files Browse the repository at this point in the history
… asavu)

git-svn-id: https://svn.apache.org/repos/asf/incubator/whirr/trunk@1092518 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Andrei Savu committed Apr 14, 2011
1 parent 1bf7fa1 commit 1aed340
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Trunk (unreleased changes)

WHIRR-277. Support multiple versions of ZooKeeper (asavu)

WHIRR-282. Set number of Hadoop slots based on hardware (tomwhite via asavu)

BUG FIXES

WHIRR-253. ZooKeeper service should only authorize ingress to ZooKeeper
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/whirr/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.whirr.Cluster.Instance;
import org.apache.whirr.util.DnsUtil;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.domain.Credentials;

/**
Expand All @@ -55,9 +56,10 @@ public static class Instance {
private final String privateIp;
private String privateHostName;
private final String id;
private final NodeMetadata nodeMetadata;

public Instance(Credentials loginCredentials, Set<String> roles, String publicIp,
String privateIp, String id) {
String privateIp, String id, NodeMetadata nodeMetadata) {
this.loginCredentials = checkNotNull(loginCredentials, "loginCredentials");
this.roles = checkNotNull(roles, "roles");
this.publicIp = checkNotNull(publicIp, "publicIp");
Expand All @@ -67,6 +69,7 @@ public Instance(Credentials loginCredentials, Set<String> roles, String publicIp
checkArgument(InetAddresses.isInetAddress(privateIp),
"invalid IP address: %s", privateIp);
this.id = checkNotNull(id, "id");
this.nodeMetadata = nodeMetadata;
}

public Credentials getLoginCredentials() {
Expand Down Expand Up @@ -116,12 +119,17 @@ public String getId() {
return id;
}

public NodeMetadata getNodeMetadata() {
return nodeMetadata;
}

public String toString() {
return Objects.toStringHelper(this)
.add("roles", roles)
.add("publicIp", publicIp)
.add("privateIp", privateIp)
.add("id", id)
.add("nodeMetadata", nodeMetadata)
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public Instance apply(NodeMetadata node) {
return new Instance(node.getCredentials(), roles,
Iterables.get(node.getPublicAddresses(), 0),
Iterables.get(node.getPrivateAddresses(), 0),
node.getId());
node.getId(), node);
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CassandraClusterActionHandlerTest {

private Instance getInstance(String id) throws UnknownHostException {
return new Instance(new Credentials("", ""), Sets.newHashSet(""),
"127.0.0.1", "127.0.0.1", id);
"127.0.0.1", "127.0.0.1", id, null);
}

@Test()
Expand Down
4 changes: 4 additions & 0 deletions services/hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import static org.apache.whirr.RolePredicates.role;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.collect.Iterables;

import java.io.IOException;
import java.util.Set;

import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
Expand All @@ -31,6 +34,8 @@
import org.apache.whirr.Cluster;
import org.apache.whirr.Cluster.Instance;
import org.apache.whirr.ClusterSpec;
import org.jclouds.compute.domain.Hardware;
import org.jclouds.compute.domain.Processor;
import org.jclouds.scriptbuilder.domain.Statement;

public class HadoopConfigurationBuilder {
Expand Down Expand Up @@ -87,18 +92,46 @@ static Configuration buildHdfsConfiguration(ClusterSpec clusterSpec,
Cluster cluster, Configuration defaults) throws ConfigurationException {
return build(clusterSpec, cluster, defaults, "hadoop-hdfs");
}

@VisibleForTesting
static Configuration buildMapReduceConfiguration(ClusterSpec clusterSpec,
Cluster cluster, Configuration defaults) throws ConfigurationException, IOException {
Configuration config = build(clusterSpec, cluster, defaults,
"hadoop-mapreduce");

Set<Instance> taskTrackers = cluster
.getInstancesMatching(role(HadoopTaskTrackerClusterActionHandler.ROLE));

if (!taskTrackers.isEmpty()) {

Hardware hardware = Iterables.getFirst(taskTrackers, null)
.getNodeMetadata().getHardware();
int coresPerNode = 0;
for (Processor processor : hardware.getProcessors()) {
coresPerNode += processor.getCores();
}
int mapTasksPerNode = (int) Math.ceil(coresPerNode * 1.0);
int reduceTasksPerNode = (int) Math.ceil(coresPerNode * 0.75);

setIfAbsent(config, "mapred.tasktracker.map.tasks.maximum", mapTasksPerNode + "");
setIfAbsent(config, "mapred.tasktracker.reduce.tasks.maximum", reduceTasksPerNode + "");

int clusterReduceSlots = taskTrackers.size() * reduceTasksPerNode;
setIfAbsent(config, "mapred.reduce.tasks", clusterReduceSlots + "");

}

Instance jobtracker = cluster
.getInstanceMatching(role(HadoopJobTrackerClusterActionHandler.ROLE));
config.setProperty("mapred.job.tracker", String.format("%s:8021",
jobtracker.getPublicAddress().getHostName()));
return config;
}

private static void setIfAbsent(Configuration config, String property, String value) {
if (!config.containsKey(property)) {
config.setProperty(property, value);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,3 @@ hadoop-mapreduce.mapred.compress.map.output=true
hadoop-mapreduce.mapred.output.compression.type=BLOCK
hadoop-mapreduce.mapred.child.java.opts=-Xmx550m
hadoop-mapreduce.mapred.child.ulimit=1126400
hadoop-mapreduce.mapred.tasktracker.map.tasks.maximum=2
hadoop-mapreduce.mapred.tasktracker.reduce.tasks.maximum=1
hadoop-mapreduce.mapred.reduce.tasks=10
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,30 @@

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.whirr.Cluster;
import org.apache.whirr.Cluster.Instance;
import org.apache.whirr.ClusterSpec;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.jclouds.compute.domain.Hardware;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.Processor;
import org.jclouds.compute.domain.Volume;
import org.jclouds.compute.domain.internal.HardwareImpl;
import org.jclouds.domain.Credentials;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -71,11 +84,31 @@ public void setUp() throws Exception {
defaults.addProperty("hadoop-mapreduce.p1", "mapred1");

clusterSpec = ClusterSpec.withTemporaryKeys();
cluster = newCluster(1);
}

private Cluster newCluster(int numberOfWorkers) {
NodeMetadata node = mock(NodeMetadata.class);
List<Processor> processors = Lists.newArrayList(new Processor(4, 1.0));
Hardware hardware = new HardwareImpl(null, null, "id", null, null,
Maps.<String,String>newHashMap(), processors, 1024,
Lists.<Volume>newArrayList(), null);
when(node.getHardware()).thenReturn(hardware);

Set<Instance> instances = Sets.newLinkedHashSet();
Instance master = new Instance(new Credentials("", ""),
Sets.newHashSet(HadoopNameNodeClusterActionHandler.ROLE,
HadoopJobTrackerClusterActionHandler.ROLE),
"10.0.0.1", "10.0.0.1", "id");
cluster = new Cluster(Sets.newHashSet(master));
"10.0.0.1", "10.0.0.1", "1", node);
instances.add(master);
for (int i = 0; i < numberOfWorkers; i++) {
int id = i + 2;
instances.add(new Instance(new Credentials("", ""),
Sets.newHashSet(HadoopDataNodeClusterActionHandler.ROLE,
HadoopTaskTrackerClusterActionHandler.ROLE),
"10.0.0." + id, "10.0.0." + id, id + "", node));
}
return new Cluster(instances);
}

@Test
Expand Down Expand Up @@ -114,11 +147,25 @@ public void testHdfs() throws Exception {

@Test
public void testMapReduce() throws Exception {
Cluster cluster = newCluster(5);
Configuration conf = HadoopConfigurationBuilder
.buildMapReduceConfiguration(clusterSpec, cluster, defaults);
assertThat(Iterators.size(conf.getKeys()), is(2));
.buildMapReduceConfiguration(clusterSpec, cluster, defaults);
assertThat(conf.getString("p1"), is("mapred1"));
assertThat(conf.getString("mapred.job.tracker"), matches(".+:8021"));
assertThat(conf.getString("mapred.tasktracker.map.tasks.maximum"), is("4"));
assertThat(conf.getString("mapred.tasktracker.reduce.tasks.maximum"), is("3"));
assertThat(conf.getString("mapred.reduce.tasks"), is("15"));
}

@Test
public void testOverridesNumberOfReducers() throws Exception {
Configuration overrides = new PropertiesConfiguration();
overrides.addProperty("hadoop-mapreduce.mapred.reduce.tasks", "7");
clusterSpec = ClusterSpec.withNoDefaults(overrides);
Configuration conf = HadoopConfigurationBuilder.buildMapReduceConfiguration(
clusterSpec, cluster, defaults);
assertThat(conf.getString("mapred.reduce.tasks"), is("7"));
}


}

0 comments on commit 1aed340

Please sign in to comment.