Skip to content

Commit

Permalink
Merge pull request #458 from mesos/bug/executorHostMode
Browse files Browse the repository at this point in the history
Reverted to host mode so ES can cluster. LIBPROCESS_IP is now set by …
  • Loading branch information
Phil Winder committed Jan 14, 2016
2 parents 6b4db64 + 6087015 commit bd60f5f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 17 deletions.
7 changes: 6 additions & 1 deletion executor/start-executor.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#!/bin/bash
LIBPROCESS_IP=$(ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}')
if [ -z "$LIBPROCESS_IP" ]; then
# Get the first non local IP address. Make sure you /etc/hosts file is correctly set.
export LIBPROCESS_IP=$(hostname -I | awk '{ print $1}' | tr -d "\n")
fi
echo "LIBPROCESS_IP is set to '$LIBPROCESS_IP'";

java $JAVA_OPTS -Djava.library.path=/usr/local/lib -jar /tmp/elasticsearch-mesos-executor.jar $@
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.apache.mesos.elasticsearch.common.elasticsearch.ElasticsearchSettings.CONTAINER_DATA_VOLUME;
Expand Down Expand Up @@ -84,7 +83,7 @@ public Protos.TaskInfo createTask(Configuration configuration, FrameworkState fr
.setSlaveId(offer.getSlaveId())
.addAllResources(acceptedResources)
.setDiscovery(discovery)
.setExecutor(newExecutorInfo(configuration, discoveryPorts.build())).build();
.setExecutor(newExecutorInfo(configuration)).build();
}

public ByteString toData(String hostname, String ipAddress, ZonedDateTime zonedDateTime) {
Expand All @@ -102,22 +101,17 @@ public ByteString toData(String hostname, String ipAddress, ZonedDateTime zonedD
return ByteString.copyFromUtf8(writer.getBuffer().toString());
}

private Protos.ExecutorInfo.Builder newExecutorInfo(Configuration configuration, Protos.Ports discoveryPorts) {
private Protos.ExecutorInfo.Builder newExecutorInfo(Configuration configuration) {
Protos.ExecutorInfo.Builder executorInfoBuilder = Protos.ExecutorInfo.newBuilder()
.setExecutorId(Protos.ExecutorID.newBuilder().setValue(UUID.randomUUID().toString()))
.setFrameworkId(frameworkState.getFrameworkID())
.setName("elasticsearch-executor-" + UUID.randomUUID().toString())
.setCommand(newCommandInfo(configuration));
if (configuration.isFrameworkUseDocker()) {

List<Protos.ContainerInfo.DockerInfo.PortMapping> portMappings = discoveryPorts.getPortsList().stream().map(
port -> Protos.ContainerInfo.DockerInfo.PortMapping.newBuilder().setHostPort(port.getNumber()).setContainerPort(port.getNumber()).build()
).collect(Collectors.toList());
Protos.ContainerInfo.DockerInfo.Builder containerBuilder = Protos.ContainerInfo.DockerInfo.newBuilder()
.setImage(configuration.getExecutorImage())
.setForcePullImage(configuration.getExecutorForcePullImage())
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.BRIDGE)
.addAllPortMappings(portMappings);
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST);

executorInfoBuilder.setContainer(Protos.ContainerInfo.newBuilder()
.setType(Protos.ContainerInfo.Type.DOCKER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,20 +215,19 @@ public void shouldAllowUserSpecifiedPorts() {
when(configuration.getElasticsearchPorts()).thenReturn(Arrays.asList(123, 456));
TaskInfoFactory factory = new TaskInfoFactory(clusterState);
Protos.TaskInfo taskInfo = factory.createTask(configuration, frameworkState, getOffer(frameworkState.getFrameworkID()), new Clock());
assertFalse(taskInfo.getContainer().isInitialized());
assertTrue(taskInfo.getExecutor().getCommand().isInitialized());
assertTrue(taskInfo.getExecutor().toString().contains("123"));
assertTrue(taskInfo.getExecutor().toString().contains("456"));
assertTrue(taskInfo.isInitialized());
assertTrue(taskInfo.toString().contains("123"));
assertTrue(taskInfo.toString().contains("456"));
}

@Test
public void shouldUseMesosProvidedPorts() {
TaskInfoFactory factory = new TaskInfoFactory(clusterState);
Protos.TaskInfo taskInfo = factory.createTask(configuration, frameworkState, getOffer(frameworkState.getFrameworkID()), new Clock());
assertFalse(taskInfo.getContainer().isInitialized());
assertTrue(taskInfo.getExecutor().getCommand().isInitialized());
assertTrue(taskInfo.getExecutor().toString().contains("9200"));
assertTrue(taskInfo.getExecutor().toString().contains("9300"));
assertTrue(taskInfo.isInitialized());
assertTrue(taskInfo.toString().contains("9200"));
assertTrue(taskInfo.toString().contains("9300"));
}

private Protos.TaskInfo createTaskInfo(Protos.TaskID taskId, ByteString data) {
Expand Down

0 comments on commit bd60f5f

Please sign in to comment.