Skip to content

Commit

Permalink
Merge pull request #502 from mesos/refactoring/scalingSystemTestStabi…
Browse files Browse the repository at this point in the history
…lity

Improve scaling system test stability. Increase ram. Add awaits. Add …
  • Loading branch information
Phil Winder committed Feb 18, 2016
2 parents 2dc64e7 + 47eb935 commit b03c4ff
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class Configuration {
private String schedulerName = "elasticsearch-scheduler";
private int schedulerGuiPort = 31100;
private int elasticsearchNodesCount = getPortRanges().length;
private int elasticsearchMemorySize = 256;
private int elasticsearchMemorySize = 500;
private String elasticsearchJobName = "esdemo";
private final Integer clusterTimeout = 60;

Expand Down Expand Up @@ -45,9 +45,9 @@ public String getElasticsearchJobName() {

public String[] getPortRanges() {
return new String[]{
"ports(*):[9200-9200,9300-9300]",
"ports(*):[9201-9201,9301-9301]",
"ports(*):[9202-9202,9302-9302]"
"ports(*):[9200-9200,9300-9300]; cpus(*):1.0; mem(*):550; disk(*):200",
"ports(*):[9201-9201,9301-9301]; cpus(*):1.0; mem(*):550; disk(*):200",
"ports(*):[9202-9202,9302-9302]; cpus(*):1.0; mem(*):550; disk(*):200"
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.mesos.elasticsearch.common.elasticsearch.ElasticsearchParser.HTTP_ADDRESS;
Expand All @@ -35,12 +36,21 @@ public HttpResponse<JsonNode> getResponse() throws UnirestException {
return Unirest.get(tasksEndPoint).asJson();
}

public List<JSONObject> getTasks() throws UnirestException {
public List<JSONObject> getTasks() {
List<JSONObject> tasks = new ArrayList<>();
LOGGER.debug("Fetching tasks on " + tasksEndPoint);
HttpResponse<JsonNode> response = Unirest.get(tasksEndPoint).asJson();
for (int i = 0; i < response.getBody().getArray().length(); i++) {
JSONObject jsonObject = response.getBody().getArray().getJSONObject(i);
final AtomicReference<HttpResponse<JsonNode>> response = new AtomicReference<>();
Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> { // This can take some time, somtimes.
try {
response.set(Unirest.get(tasksEndPoint).asJson());
return true;
} catch (UnirestException e) {
LOGGER.debug(e);
return false;
}
});
for (int i = 0; i < response.get().getBody().getArray().length(); i++) {
JSONObject jsonObject = response.get().getBody().getArray().getJSONObject(i);
// If the ports are exposed on the docker adaptor, then force the http_address's to point to the docker adaptor IP address.
// This is a nasty hack, much like `if (testing) doSomething();`. This means we are no longer testing a
// real-life network setup.
Expand All @@ -56,38 +66,43 @@ public List<JSONObject> getTasks() throws UnirestException {
}

// TODO (pnw): I shouldn't have to prepend http everywhere. Add here instead.
public List<String> getEsHttpAddressList() throws UnirestException {
public List<String> getEsHttpAddressList() {
return getTasks().stream().map(ElasticsearchParser::parseHttpAddress).collect(Collectors.toList());
}

public void waitForGreen() {
LOGGER.debug("Wating for green.");
public void waitForGreen(Integer numNodes) {
LOGGER.debug("Wating for green and " + numNodes + " nodes.");
Awaitility.await().atMost(5, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).until(() -> { // This can take some time, somtimes.
try {
List<String> esAddresses = getEsHttpAddressList();
// This may throw a JSONException if we call before the JSON has been generated. Hence, catch exception.
String body = Unirest.get("http://" + esAddresses.get(0) + "/_cluster/health").asString().getBody();
LOGGER.debug(body);
return body.contains("green");
final JSONObject body = Unirest.get("http://" + esAddresses.get(0) + "/_cluster/health").asJson().getBody().getObject();
final boolean numberOfNodes = body.getInt("number_of_nodes") == numNodes;
final boolean green = body.getString("status").equals("green");
final boolean initializingShards = body.getInt("initializing_shards") == 0;
final boolean unassignedShards = body.getInt("unassigned_shards") == 0;
LOGGER.debug(green + " and " + numberOfNodes + " and " + initializingShards + " and " + unassignedShards + ": " + body);
return green && numberOfNodes && initializingShards && unassignedShards;
} catch (Exception e) {
LOGGER.debug(e);
return false;
}
});
}

public Integer getDocumentCount(String httpAddress) throws UnirestException {
JSONArray responseElements = Unirest.get("http://" + httpAddress + "/_count").asJson().getBody().getArray();
LOGGER.info(responseElements);
LOGGER.debug(responseElements);
return responseElements.getJSONObject(0).getInt("count");
}

public void waitForCorrectDocumentCount(Integer docCount) throws UnirestException {
List<String> esAddresses = getEsHttpAddressList();
Awaitility.await().atMost(2, TimeUnit.MINUTES).pollDelay(2, TimeUnit.SECONDS).until(() -> {
Awaitility.await().atMost(1, TimeUnit.MINUTES).pollDelay(2, TimeUnit.SECONDS).until(() -> {
for (String httpAddress : esAddresses) {
try {
Integer count = getDocumentCount(httpAddress);
if (count == 0 || count % docCount != 0) { // This allows for repeated testing.
if (docCount != 0 && (count == 0 || count % docCount != 0)) { // This allows for repeated testing. Only run if docCount != 0.
return false;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class DataRetrievableAllNodesSystemTest extends SchedulerTestBase {
@Test
public void testDataConsistency() throws Exception {
ESTasks esTasks = new ESTasks(TEST_CONFIG, getScheduler().getIpAddress(), true);
esTasks.waitForGreen();
esTasks.waitForGreen(3);

List<String> esAddresses = esTasks.getEsHttpAddressList();
pusher = new DataPusherContainer(CLUSTER_ARCHITECTURE.dockerClient, esAddresses.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ ZookeeperCLIParameter.ZOOKEEPER_MESOS_URL, getZookeeperMesosUrl(),
Configuration.EXECUTOR_HEALTH_DELAY, "99",
Configuration.EXECUTOR_TIMEOUT, "100", // This timeout is valid, but will always timeout, because of delays in receiving healthchecks.
ElasticsearchCLIParameter.ELASTICSEARCH_NODES, "3",
Configuration.ELASTICSEARCH_RAM, "256"
Configuration.ELASTICSEARCH_RAM, Integer.toString(TEST_CONFIG.getElasticsearchMemorySize()),
Configuration.ELASTICSEARCH_CPU, "0.1",
Configuration.ELASTICSEARCH_DISK, "150"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.apache.mesos.elasticsearch.systemtest;

import com.containersol.minimesos.mesos.DockerClientFactory;
import com.github.dockerjava.api.DockerClient;
import com.jayway.awaitility.Awaitility;
import com.mashape.unirest.http.HttpResponse;
import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
Expand All @@ -14,21 +15,19 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;

/**
* Tests the scaling capabilities. To run multiple times, uncomment the code below.
*/
//@RunWith(Parameterized.class)
public class ScalingSystemTest extends SchedulerTestBase {

private static final Logger LOGGER = Logger.getLogger(ScalingSystemTest.class);

public static final int WEBUI_PORT = 31100;

public static final int NUM_TEST_DOCS = 10;
private static DockerClient dockerClient = DockerClientFactory.build();
private String ipAddress;

private ESTasks esTasks;

// @Parameterized.Parameters
Expand All @@ -37,94 +36,120 @@ public class ScalingSystemTest extends SchedulerTestBase {
// }

@Before
public void before() throws UnirestException {
public void before() {
ipAddress = getScheduler().getIpAddress();
esTasks = new ESTasks(TEST_CONFIG, ipAddress, true);
List<String> esAddresses = esTasks.getEsHttpAddressList();

DataPusherContainer pusher = new DataPusherContainer(CLUSTER_ARCHITECTURE.dockerClient, esAddresses.get(0));
CLUSTER.addAndStartContainer(pusher, TEST_CONFIG.getClusterTimeout());
LOGGER.info("Started data push");
}

@Test
public void shouldScaleDown() throws UnirestException {
public void shouldScaleDown() {
scaleNumNodesTo(ipAddress, 3); // Reset to 3 nodes
esTasks.waitForGreen();
esTasks.waitForGreen(3);
LOGGER.debug("Number of nodes: " + getNumberOfNodes(ipAddress));
scaleNumNodesTo(ipAddress, 2);
esTasks.waitForGreen();
esTasks.waitForGreen(2);
}

@Test
public void shouldScaleUp() throws UnirestException {
public void shouldScaleUp() {
scaleNumNodesTo(ipAddress, 2); // Reset to 2 nodes
esTasks.waitForGreen();
esTasks.waitForGreen(2);
LOGGER.debug("Number of nodes: " + getNumberOfNodes(ipAddress));
scaleNumNodesTo(ipAddress, 3);
esTasks.waitForGreen();
esTasks.waitForGreen(3);
}

@Test
public void shouldNotLoseDataWhenScalingDown() throws UnirestException {
// Make sure we have three nodes
scaleNumNodesTo(ipAddress, 3);
esTasks.waitForGreen();

esTasks.waitForGreen(3);
List<String> esAddresses = esTasks.getEsHttpAddressList();

Unirest.delete("http://" + esAddresses.get(0) + "/*").asJson();

esTasks.waitForCorrectDocumentCount(0); // Make sure we can actually connect.

LOGGER.info("Addresses: " + esAddresses);

esTasks.waitForCorrectDocumentCount(DataPusherContainer.CORRECT_NUM_DOCS);
seedData("http://" + esAddresses.get(0));
LOGGER.info("Started data push");

esTasks.waitForCorrectDocumentCount(NUM_TEST_DOCS);

// Scale down to one node
scaleNumNodesTo(ipAddress, 1);
esTasks.waitForGreen();
esTasks.waitForGreen(1);

// Check that the data is still correct
esTasks.waitForCorrectDocumentCount(DataPusherContainer.CORRECT_NUM_DOCS);
esTasks.waitForCorrectDocumentCount(NUM_TEST_DOCS);
}

@Test
public void shouldNotHaveStaleData() throws UnirestException, IOException {
// Make sure we have three nodes
scaleNumNodesTo(ipAddress, 3);
esTasks.waitForGreen(3);
List<String> esAddresses = esTasks.getEsHttpAddressList();

Unirest.delete("http://" + esAddresses.get(0) + "/*").asJson();

esTasks.waitForCorrectDocumentCount(0); // Make sure we can actually connect.

LOGGER.info("Addresses: " + esAddresses);

seedData("http://" + esAddresses.get(0));
LOGGER.info("Started data push");

LOGGER.info("Scaling down to 2 nodes");
scaleNumNodesTo(ipAddress, 2);
esTasks.waitForGreen();
esTasks.waitForGreen(2);
esTasks.waitForCorrectDocumentCount(DataPusherContainer.CORRECT_NUM_DOCS);
List<String> esAddresses = esTasks.getEsHttpAddressList();
esAddresses = esTasks.getEsHttpAddressList();

// Do the delete on two nodes
JsonNode body = Unirest.delete("http://" + esAddresses.get(0) + "/shakespeare-*").asJson().getBody();
LOGGER.info("Deleting data " + body);

// Scale up to three nodes, there should still be zero docs.
LOGGER.info("Scaling back to 3 nodes");
scaleNumNodesTo(ipAddress, 3);
esTasks.waitForGreen();
esTasks.waitForGreen(3);

Awaitility.await().atMost(5, TimeUnit.MINUTES).pollInterval(1, TimeUnit.SECONDS).until(() -> { // This can take some time, somtimes.
try {
List<String> esHttpAddressList = esTasks.getEsHttpAddressList();
String body1 = Unirest.get("http://" + esHttpAddressList.get(0) + "/_cluster/health").asString().getBody();
LOGGER.debug(body1);
int documentCount = Unirest.get("http://" + esAddresses.get(0) + "/_count").asJson().getBody().getArray().getJSONObject(0).getInt("count");
return documentCount == 0;
} catch (Exception e) {
return false;
}
});
esTasks.waitForCorrectDocumentCount(0); // Ensure there are zero docs.
}

public void scaleNumNodesTo(String ipAddress, Integer newNumNodes) throws UnirestException {
HttpResponse<JsonNode> response = Unirest.put("http://" + ipAddress + ":" + WEBUI_PORT + "/v1/cluster/elasticsearchNodes").header("Content-Type", "application/json").body(newNumNodes.toString()).asJson();
assertEquals(200, response.getStatus());
private void seedData(String esAddresses) {
try {
for (int i = 0; i < NUM_TEST_DOCS; i++) {
Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> Unirest.post(esAddresses + "/dummy/data")
.body("{ \"user\" : \"kimchy\", \"post_date\" : \"2009-11-15T14:12:12\", \"message\" : \"trying out Elasticsearch\" }")
.asString().getStatus() == 201);
}
} catch (Exception e) {
LOGGER.debug(e);
}
}

public void scaleNumNodesTo(String ipAddress, Integer newNumNodes) {
Awaitility.await().atMost(60, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
Unirest.put("http://" + ipAddress + ":" + WEBUI_PORT + "/v1/cluster/elasticsearchNodes").header("Content-Type", "application/json").body(newNumNodes.toString()).asJson();
String numNodes = getNumberOfNodes(ipAddress);
LOGGER.debug("Number of nodes: " + numNodes);
return numNodes.equals(newNumNodes.toString());
});
assertEquals(newNumNodes.toString(), getNumberOfNodes(ipAddress));
}

public String getNumberOfNodes(String ipAddress) throws UnirestException {
return Unirest.get("http://" + ipAddress + ":" + WEBUI_PORT + "/v1/cluster/elasticsearchNodes").asJson().getBody().getObject().get("value").toString();
public String getNumberOfNodes(String ipAddress) {
final AtomicReference<String> numNodes = new AtomicReference<>();
Awaitility.await().pollInterval(1L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS).until(() -> {
try {
numNodes.set(Unirest.get("http://" + ipAddress + ":" + WEBUI_PORT + "/v1/cluster/elasticsearchNodes").asJson().getBody().getObject().get("value").toString());
return true;
} catch (Exception e) {
return false;
}
});
return numNodes.get();
}
}

0 comments on commit b03c4ff

Please sign in to comment.