Skip to content

Commit

Permalink
Merge pull request #518 from mesos/feature/499-SpecifyESVersion
Browse files Browse the repository at this point in the history
Add ability to specify ES version.
  • Loading branch information
Phil Winder committed Mar 1, 2016
2 parents 6078ddb + ccbb302 commit f23977c
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 10 deletions.
18 changes: 12 additions & 6 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ Usage: (Options preceded by an asterisk are required) [options]
The host data directory used by Docker volumes in the executors. [DOCKER
MODE ONLY]
Default: /var/lib/mesos/slave/elasticsearch
--elasticsearchBinaryUrl
The elasticsearch binary to use (Must be tar.gz format). E.g. 'https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.0/elasticsearch-2.2.0.tar.gz'
[JAR MODE ONLY]
Default: <empty string>
--elasticsearchClusterName
Name of the elasticsearch cluster
Default: mesos-ha
Expand All @@ -127,6 +131,10 @@ Usage: (Options preceded by an asterisk are required) [options]
The amount of Disk resource to allocate to the elasticsearch instance
(MB).
Default: 1024.0
--elasticsearchDockerImage
The elasticsearch docker image to use. E.g. 'elasticsearch:latest'
[DOCKER MODE ONLY]
Default: elasticsearch:latest
--elasticsearchExecutorCpu
The amount of CPU resource to allocate to the elasticsearch executor.
Default: 0.1
Expand All @@ -145,16 +153,14 @@ Usage: (Options preceded by an asterisk are required) [options]
(MB).
Default: 256.0
--elasticsearchSettingsLocation
Path or URL to ES yml settings file. [In docker mode file must be in
/tmp/config] E.g. '/tmp/config/elasticsearch.yml' or 'https://gist.githubusercontent.com/mmaloney/5e1da5daa58b70a3a671/raw/elasticsearch.yml'
Path or URL to ES yml settings file. Path example:
'/var/lib/mesos/config/elasticsearch.yml'. URL example: 'https://gist.githubusercontent.com/mmaloney/5e1da5daa58b70a3a671/raw/elasticsearch.yml'.
In Docker mode a volume will be created from /tmp/config in the container to
the directory that contains elasticsearch.yml.
Default: <empty string>
--executorForcePullImage
Option to force pull the executor image. [DOCKER MODE ONLY]
Default: false
--executorImage
The docker executor image to use. E.g. 'elasticsearch:latest' [DOCKER
MODE ONLY]
Default: elasticsearch:latest
--executorName
The name given to the executor task.
Default: elasticsearch-executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public class Configuration {
// DCOS Certification requirement 13
public static final String FRAMEWORK_ROLE = "--frameworkRole";
public static final String EXECUTOR_TIMEOUT = "--executorTimeout";
public static final String EXECUTOR_IMAGE = "--executorImage";
public static final String EXECUTOR_IMAGE = "--elasticsearchDockerImage";
public static final String EXECUTOR_BINARY = "--elasticsearchBinaryUrl";
public static final String DEFAULT_EXECUTOR_IMAGE = "elasticsearch:latest";
public static final String EXECUTOR_FORCE_PULL_IMAGE = "--executorForcePullImage";
public static final String FRAMEWORK_PRINCIPAL = "--frameworkPrincipal";
Expand Down Expand Up @@ -93,8 +94,11 @@ public class Configuration {
private double frameworkFailoverTimeout = 2592000; // Mesos will kill framework after 1 month if marathon does not restart.
@Parameter(names = {FRAMEWORK_ROLE}, description = "Used to group frameworks for allocation decisions, depending on the allocation policy being used.", validateWith = CLIValidators.NotEmptyString.class)
private String frameworkRole = "*"; // This is the default if none is passed to Mesos
@Parameter(names = {EXECUTOR_IMAGE}, description = "The docker executor image to use. E.g. 'elasticsearch:latest' [DOCKER MODE ONLY]", validateWith = CLIValidators.NotEmptyString.class)
@Parameter(names = {EXECUTOR_IMAGE}, description = "The elasticsearch docker image to use. E.g. 'elasticsearch:latest' [DOCKER MODE ONLY]", validateWith = CLIValidators.NotEmptyString.class)
private String executorImage = DEFAULT_EXECUTOR_IMAGE;
@Parameter(names = {EXECUTOR_BINARY}, description = "The elasticsearch binary to use (Must be tar.gz format). " +
"E.g. 'https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.0/elasticsearch-2.2.0.tar.gz' [JAR MODE ONLY]", validateWith = CLIValidators.NotEmptyString.class)
private String executorBinary = "";
@Parameter(names = {EXECUTOR_FORCE_PULL_IMAGE}, arity = 1, description = "Option to force pull the executor image. [DOCKER MODE ONLY]")
private Boolean executorForcePullImage = false;
@Parameter(names = {FRAMEWORK_PRINCIPAL}, description = "The principal to use when registering the framework (username).")
Expand Down Expand Up @@ -201,6 +205,10 @@ public Boolean getIsUseIpAddress() {
return isUseIpAddress;
}

public String getElasticsearchBinary() {
return executorBinary;
}

// ******* Helper methods
public String getMesosStateZKURL() {
ZKFormatter mesosStateZKFormatter = new IpPortsListZKFormatter(new ZKAddressParser());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,12 @@ private Protos.CommandInfo nativeCommand(Configuration configuration, List<Strin
.setShell(true)
.setValue(command)
.setUser("root")
.mergeEnvironment(environment)
.addUris(Protos.CommandInfo.URI.newBuilder().setValue(httpPath));
.mergeEnvironment(environment);
if (configuration.getElasticsearchBinary().isEmpty()) {
builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(httpPath));
} else {
builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(configuration.getElasticsearchBinary()));
}
if (!configuration.getElasticsearchSettingsLocation().isEmpty()) {
builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(configuration.getElasticsearchSettingsLocation()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void before() {
when(configuration.isFrameworkUseDocker()).thenReturn(true);
when(configuration.getElasticsearchPorts()).thenReturn(Collections.emptyList());
when(configuration.taskSpecificHostDir(any())).thenReturn("/var/lib/mesos/slave/elasticsearch/cluster-name/" + SLAVEID);
when(configuration.getElasticsearchBinary()).thenReturn("");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package org.apache.mesos.elasticsearch.systemtest;

import com.containersol.minimesos.cluster.MesosCluster;
import com.containersol.minimesos.mesos.ClusterArchitecture;
import com.containersol.minimesos.mesos.DockerClientFactory;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateContainerCmd;
import com.jayway.awaitility.Awaitility;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import org.apache.mesos.elasticsearch.common.cli.ZookeeperCLIParameter;
import org.apache.mesos.elasticsearch.systemtest.callbacks.ElasticsearchNodesResponse;
import org.apache.mesos.elasticsearch.systemtest.containers.ElasticsearchSchedulerContainer;
import org.apache.mesos.elasticsearch.systemtest.containers.MesosMasterTagged;
import org.apache.mesos.elasticsearch.systemtest.containers.MesosSlaveTagged;
import org.apache.mesos.elasticsearch.systemtest.util.DockerUtil;
import org.apache.mesos.elasticsearch.systemtest.util.IpTables;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.mesos.elasticsearch.scheduler.Configuration.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Tests that we can run different versions of ES
*/
public class DifferentESVersionSystemTest {
private static final Logger LOG = LoggerFactory.getLogger(DifferentESVersionSystemTest.class);
public static final String ES_VERSION = "2.0.2";
public static final String ES_IMAGE = "elasticsearch:" + ES_VERSION;
public static final String ES_BINARY = "https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/" + ES_VERSION + "/elasticsearch-" + ES_VERSION + ".tar.gz";
protected static final Configuration TEST_CONFIG = new Configuration();
private final DockerClient dockerClient = DockerClientFactory.build();
private final DockerUtil dockerUtil = new DockerUtil(dockerClient);
private MesosCluster cluster;

@Before
public void before() {
dockerUtil.killAllSchedulers();
dockerUtil.killAllExecutors();
final ClusterArchitecture clusterArchitecture = new ClusterArchitecture.Builder()
.withZooKeeper()
.withMaster(MesosMasterTagged::new)
.withSlave(zooKeeper -> new MesosSlaveTagged(zooKeeper, TEST_CONFIG.getPortRanges().get(0)))
.withSlave(zooKeeper -> new MesosSlaveTagged(zooKeeper, TEST_CONFIG.getPortRanges().get(1)))
.withSlave(zooKeeper -> new MesosSlaveTagged(zooKeeper, TEST_CONFIG.getPortRanges().get(2)))
.build();
cluster = new MesosCluster(clusterArchitecture);
cluster.start(TEST_CONFIG.getClusterTimeout());
}

@After
public void after() {
dockerUtil.killAllSchedulers();
dockerUtil.killAllExecutors();
if (cluster != null) {
cluster.stop();
}
}


@Test
public void shouldStartDockerImage() {
IpTables.apply(dockerClient, cluster, TEST_CONFIG); // Only forward docker traffic

final DockerESVersionScheduler scheduler = new DockerESVersionScheduler(dockerClient, cluster.getZkContainer().getIpAddress(), cluster, ES_IMAGE);
cluster.addAndStartContainer(scheduler, TEST_CONFIG.getClusterTimeout());
LOG.info("Started Elasticsearch scheduler on " + scheduler.getIpAddress() + ":" + TEST_CONFIG.getSchedulerGuiPort());

ESTasks esTasks = new ESTasks(TEST_CONFIG, scheduler.getIpAddress());
new TasksResponse(esTasks, TEST_CONFIG.getElasticsearchNodesCount());

ElasticsearchNodesResponse nodesResponse = new ElasticsearchNodesResponse(esTasks, TEST_CONFIG.getElasticsearchNodesCount());
assertTrue("Elasticsearch nodes did not discover each other within 5 minutes", nodesResponse.isDiscoverySuccessful());

final AtomicReference<String> versionNumber = new AtomicReference<>("");
Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(TEST_CONFIG.getClusterTimeout(), TimeUnit.SECONDS).until(() -> {
try {
versionNumber.set(Unirest.get("http://" + esTasks.getEsHttpAddressList().get(0)).asJson().getBody().getObject().getJSONObject("version").getString("number"));
return true;
} catch (UnirestException e) {
return false;
}
});
assertEquals("ES version is not the same as requested: " + ES_VERSION + " != " + versionNumber.get(), ES_VERSION, versionNumber.get());
}

@Test
public void shouldStartJar() {
// Don't forward traffic. Jars are actually running on the slaves
final JarESVersionScheduler scheduler = new JarESVersionScheduler(dockerClient, cluster.getZkContainer().getIpAddress(), cluster, ES_BINARY);
cluster.addAndStartContainer(scheduler, TEST_CONFIG.getClusterTimeout());
LOG.info("Started Elasticsearch scheduler on " + scheduler.getIpAddress() + ":" + TEST_CONFIG.getSchedulerGuiPort());

ESTasks esTasks = new ESTasks(TEST_CONFIG, scheduler.getIpAddress());
new TasksResponse(esTasks, TEST_CONFIG.getElasticsearchNodesCount());

ElasticsearchNodesResponse nodesResponse = new ElasticsearchNodesResponse(esTasks, TEST_CONFIG.getElasticsearchNodesCount());
assertTrue("Elasticsearch nodes did not discover each other within 5 minutes", nodesResponse.isDiscoverySuccessful());

final AtomicReference<String> versionNumber = new AtomicReference<>("");
Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(TEST_CONFIG.getClusterTimeout(), TimeUnit.SECONDS).until(() -> {
try {
versionNumber.set(Unirest.get("http://" + esTasks.getEsHttpAddressList().get(0)).asJson().getBody().getObject().getJSONObject("version").getString("number"));
return true;
} catch (UnirestException e) {
return false;
}
});
assertEquals("ES version is not the same as requested: " + ES_VERSION + " != " + versionNumber.get(), ES_VERSION, versionNumber.get());
}

/**
* Versioned docker scheduler
*/
private static class DockerESVersionScheduler extends ElasticsearchSchedulerContainer {

private final String image;

public DockerESVersionScheduler(DockerClient dockerClient, String zkIp, MesosCluster cluster, String image) {
super(dockerClient, zkIp, cluster);
this.image = image;
}

@Override
public void pullImage() {
super.pullImage();
dockerClient.pullImageCmd(image);
}

@Override
protected CreateContainerCmd dockerCommand() {
return dockerClient
.createContainerCmd(TEST_CONFIG.getSchedulerImageName())
.withName(TEST_CONFIG.getSchedulerName() + "_" + new SecureRandom().nextInt())
.withEnv("JAVA_OPTS=-Xms128m -Xmx256m")
.withCmd(
ZookeeperCLIParameter.ZOOKEEPER_MESOS_URL, getZookeeperMesosUrl(),
ELASTICSEARCH_RAM, Integer.toString(TEST_CONFIG.getElasticsearchMemorySize()),
ELASTICSEARCH_CPU, "0.1",
ELASTICSEARCH_DISK, "150",
EXECUTOR_IMAGE, image,
FRAMEWORK_USE_DOCKER, "true");
}
}

/**
* Versioned Jar Scheduler
*/
private static class JarESVersionScheduler extends ElasticsearchSchedulerContainer {

private final String binaryUrl;

public JarESVersionScheduler(DockerClient dockerClient, String zkIp, MesosCluster cluster, String binaryUrl) {
super(dockerClient, zkIp, cluster);
this.binaryUrl = binaryUrl;
}

@Override
protected CreateContainerCmd dockerCommand() {
return dockerClient
.createContainerCmd(TEST_CONFIG.getSchedulerImageName())
.withName(TEST_CONFIG.getSchedulerName() + "_" + new SecureRandom().nextInt())
.withEnv("JAVA_OPTS=-Xms128m -Xmx256m")
.withCmd(
ZookeeperCLIParameter.ZOOKEEPER_MESOS_URL, getZookeeperMesosUrl(),
ELASTICSEARCH_RAM, Integer.toString(TEST_CONFIG.getElasticsearchMemorySize()),
ELASTICSEARCH_CPU, "0.1",
ELASTICSEARCH_DISK, "150",
EXECUTOR_BINARY, binaryUrl,
FRAMEWORK_USE_DOCKER, "false");
}
}
}

0 comments on commit f23977c

Please sign in to comment.