Skip to content

Commit

Permalink
Merge pull request #459 from mesos/branch/waitForESToCluster
Browse files Browse the repository at this point in the history
Wait until first ES node is operational before attempting clustering.
  • Loading branch information
Phil Winder committed Jan 14, 2016
2 parents bd60f5f + cb515bd commit 003e8e7
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 7 deletions.
1 change: 1 addition & 0 deletions executor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
compile "org.elasticsearch:elasticsearch:${elasticsearchVersion}"
compile "log4j:log4j:1.2.16"
compile "com.beust:jcommander:1.48"
compile 'com.jayway.awaitility:awaitility:1.6.3'

testCompile 'com.mashape.unirest:unirest-java:1.4.5'
testCompile 'com.jayway.awaitility:awaitility:1.6.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.mesos.Protos;
import org.apache.mesos.elasticsearch.executor.elasticsearch.ElasticsearchLauncher;
import org.apache.mesos.elasticsearch.executor.elasticsearch.Launcher;
import org.apache.mesos.elasticsearch.executor.elasticsearch.NodeUtil;
import org.apache.mesos.elasticsearch.executor.mesos.ElasticsearchExecutor;
import org.apache.mesos.elasticsearch.executor.mesos.TaskStatus;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -19,7 +20,7 @@ private Main() {

public static void main(String[] args) throws Exception {
Launcher launcher = new ElasticsearchLauncher(Settings.builder());
MesosExecutorDriver driver = new MesosExecutorDriver(new ElasticsearchExecutor(launcher, new TaskStatus()));
MesosExecutorDriver driver = new MesosExecutorDriver(new ElasticsearchExecutor(launcher, new TaskStatus(), new NodeUtil()));
System.exit(driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.mesos.elasticsearch.executor.elasticsearch;

import org.apache.log4j.Logger;
import org.elasticsearch.node.Node;

import java.util.concurrent.ExecutionException;

/**
*/
public class NodeUtil {
Logger LOG = Logger.getLogger(NodeUtil.class);

public String getNodeStatus(Node node) {
try {
String status = node.client().admin().cluster().prepareHealth().execute().get().getStatus().toString().toLowerCase();
LOG.debug("ES status: " + status);
return status;
} catch (InterruptedException | ExecutionException e) {
LOG.warn("ES not started. Retrying...", e);
return "";
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.apache.mesos.elasticsearch.executor.mesos;

import com.google.protobuf.InvalidProtocolBufferException;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.core.ConditionTimeoutException;
import org.apache.log4j.Logger;
import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.elasticsearch.executor.Configuration;
import org.apache.mesos.elasticsearch.executor.elasticsearch.Launcher;
import org.apache.mesos.elasticsearch.executor.elasticsearch.NodeUtil;
import org.apache.mesos.elasticsearch.executor.model.HostsModel;
import org.apache.mesos.elasticsearch.executor.model.PortsModel;
import org.apache.mesos.elasticsearch.executor.model.RunTimeSettings;
Expand All @@ -16,21 +19,26 @@
import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Executor for Elasticsearch.
*/
@SuppressWarnings("PMD.TooManyMethods")
public class ElasticsearchExecutor implements Executor {
public static final long ES_TIMEOUT = 120L;
public static final String ES_STATUS_GREEN = "green";
private final Launcher launcher;
public static final Logger LOGGER = Logger.getLogger(ElasticsearchExecutor.class.getCanonicalName());
private final TaskStatus taskStatus;
private final NodeUtil nodeUtil;
private Configuration configuration;
private Node node;

public ElasticsearchExecutor(Launcher launcher, TaskStatus taskStatus) {
public ElasticsearchExecutor(Launcher launcher, TaskStatus taskStatus, NodeUtil nodeUtil) {
this.launcher = launcher;
this.taskStatus = taskStatus;
this.nodeUtil = nodeUtil;
}

@Override
Expand Down Expand Up @@ -88,8 +96,19 @@ public void launchTask(final ExecutorDriver driver, final Protos.TaskInfo task)
// Launch Node
node = launcher.launch();

// Send status update, running
driver.sendStatusUpdate(taskStatus.running());
try {
Awaitility.await()
.atMost(ES_TIMEOUT, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.until(() -> nodeUtil.getNodeStatus(node).equals(ES_STATUS_GREEN));

// Send status update, running
driver.sendStatusUpdate(taskStatus.running());
} catch (ConditionTimeoutException e) {
LOGGER.error("ES node was not green within " + ES_TIMEOUT + "s.");
driver.sendStatusUpdate(taskStatus.failed());
killTask(driver, taskID);
}
} catch (InvalidParameterException e) {
driver.sendStatusUpdate(taskStatus.failed());
LOGGER.error(e);
Expand Down
4 changes: 4 additions & 0 deletions executor/src/main/resources/elasticsearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ discovery.type: zen
#
# discovery.zen.ping.timeout: 3s

discovery.zen.fd.ping_timeout: 30s
discovery.zen.fd.ping_interval: 1s
discovery.zen.fd.ping_retries: 30

# For more information, see
# <http://elasticsearch.org/guide/en/elasticsearch/reference/current/modules-discovery-zen.html>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.mesos.Protos;
import org.apache.mesos.elasticsearch.common.Discovery;
import org.apache.mesos.elasticsearch.executor.elasticsearch.Launcher;
import org.apache.mesos.elasticsearch.executor.elasticsearch.NodeUtil;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.junit.Before;
Expand All @@ -13,6 +14,8 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.util.concurrent.ExecutionException;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

Expand All @@ -35,10 +38,14 @@ public class ElasticsearchExecutorTest {
@Mock
private ExecutorDriver driver;

@Mock
private NodeUtil nodeUtil;

@Before
public void setupLauncher() {
Node node = mock(Node.class);
public void setupLauncher() throws ExecutionException, InterruptedException {
Node node = mock(Node.class, RETURNS_DEEP_STUBS);
when(launcher.launch()).thenReturn(node);
when(nodeUtil.getNodeStatus(eq(node))).thenReturn(ElasticsearchExecutor.ES_STATUS_GREEN);
}

@Test(expected = NullPointerException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class OfferStrategy {
private List<OfferRule> acceptanceRules = asList(
new OfferRule("Host already running task", this::isHostAlreadyRunningTask),
new OfferRule("Hostname is unresolveable", offer -> !isHostnameResolveable(offer.getHostname())),
new OfferRule("First ES node is not responding", offer -> !isAtLeastOneESNodeRunning()),
new OfferRule("Cluster size already fulfilled", offer -> clusterState.getTaskList().size() >= configuration.getElasticsearchNodes()),
new OfferRule("Offer did not have 2 ports", offer -> !containsTwoPorts(offer.getResourcesList())),
new OfferRule("The offer does not contain the user specified ports", offer -> !containsUserSpecifiedPorts(offer.getResourcesList())),
Expand All @@ -36,6 +37,16 @@ private boolean isHostnameResolveable(String hostname) {
return !address.isUnresolved();
}

private boolean isAtLeastOneESNodeRunning() {
// If this is the first, do not check
List<Protos.TaskInfo> taskList = clusterState.getTaskList();
if (taskList.size() == 0) {
return true;
} else {
return clusterState.getStatus(taskList.get(0).getTaskId()).getStatus().getState().equals(Protos.TaskState.TASK_RUNNING);
}
}

public OfferStrategy(Configuration configuration, ClusterState clusterState) {
this.clusterState = clusterState;
this.configuration = configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.mesos.Protos;
import org.apache.mesos.elasticsearch.scheduler.state.ClusterState;
import org.apache.mesos.elasticsearch.scheduler.state.ESTaskStatus;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -16,6 +17,8 @@
import static java.util.Collections.singletonList;
import static org.apache.mesos.elasticsearch.scheduler.Resources.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
Expand All @@ -37,8 +40,11 @@ public class OfferStrategyTest {
@Before
public void setUp() throws Exception {
when(configuration.getFrameworkRole()).thenReturn("testRole");
ESTaskStatus esTaskStatus = mock(ESTaskStatus.class);
when(esTaskStatus.getStatus()).thenReturn(taskStatus());
when(clusterState.getStatus(any(Protos.TaskID.class))).thenReturn(esTaskStatus);
}

@Test
public void willDeclineIfHostIsAlreadyRunningTask() throws Exception {
when(clusterState.getTaskList()).thenReturn(singletonList(createTask("host1")));
Expand Down Expand Up @@ -224,4 +230,11 @@ private Protos.Offer.Builder baseOfferBuilder(String slaveId) {
.setHostname("localhost")
.setSlaveId(Protos.SlaveID.newBuilder().setValue(slaveId).build());
}

private Protos.TaskStatus taskStatus() {
return Protos.TaskStatus.newBuilder()
.setTaskId(Protos.TaskID.newBuilder().setValue("TestId").build())
.setState(Protos.TaskState.TASK_RUNNING)
.build();
}
}

0 comments on commit 003e8e7

Please sign in to comment.