Skip to content

Commit

Permalink
Make integration tests capable of testing multiple running reaper ins…
Browse files Browse the repository at this point in the history
…tances.

Introducing a fault tolerant reaper (where multiple reaper instances could run and coordinate ongoing repairs on top of a shared Cassandra backend storage) requires extensive testing.
While the UI is by default an eventually consistent experience, the code design is not. By extending the testing framework to support multiple (and unstalbe/flapping) reaper instances,
and parallel and duplicate htp requests to the RESTful interface, it becomes possible to test such coordination and the required 'partition tolerance' on the backend storage.

Changes made:
 - a number of REST http status code responses were corrected, ie better usage of: METHOD_NOT_ALLOWED, NOT_MODIFIED, NOT_FOUND,
 - a number of REST resoure methods were strengthen ensuring the correct error http status code were returned,
 - making JMX connect timeout configurable
 - make all c* requests marked as idempotent,
 - in `CassandraStorage.getSegment(..)` return a random segment which hasn't yet been started, instead of that with the lowest failCount,
 - in BasicSteps reaper instances can be added and removed concurrently, but synchronized by test method,
 - in BasicSteps parallel stream requests through all reaper instances where appropriate, where not pick a random instance to send the request through,
 - in BasicSteps accept a set of possible http status codes from the response, as multiple put/post requests mean all but one with fail in some manner,
 - in BasicSteps append assertions after multiple possible http status codes have been checked to ensure a resulting consistency in furture http status response codes,
 - ReaperCassandraIT is parameterised, via system properties "grim.reaper.min" and "grim.reaper.max", for how many stable and flapping reaper instances are to be used,
 - In ReaperCassandraIt put a timeout on how long we'll keep retrying to drop the test keyspace,
 - ReaperTestJettyRunner needed a little redesign to allow multiple instances per jvm,
 - move TestUtils methods into BasicSteps,
 - put a timeout on the mutex waits in RepairRunnerTest.

ref: #124
  • Loading branch information
michaelsembwever committed Aug 23, 2017
1 parent 1cc8520 commit ccbb31f
Show file tree
Hide file tree
Showing 30 changed files with 1,124 additions and 689 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public class ReaperApplicationConfiguration extends Configuration {
@JsonProperty
@DefaultValue("false")
private Boolean activateQueryLogger;

@JsonProperty
@DefaultValue("5")
private int jmxConnectionTimeoutInSeconds;

public int getSegmentCount() {
return segmentCount;
Expand Down Expand Up @@ -292,6 +296,15 @@ public int getHangingRepairTimeoutMins() {
return hangingRepairTimeoutMins;
}

@JsonProperty
public void setJmxConnectionTimeoutInSeconds(int jmxConnectionTimeoutInSeconds) {
this.jmxConnectionTimeoutInSeconds = jmxConnectionTimeoutInSeconds;
}

public int getJmxConnectionTimeoutInSeconds() {
return jmxConnectionTimeoutInSeconds;
}

@JsonProperty
public void setHangingRepairTimeoutMins(int hangingRepairTimeoutMins) {
this.hangingRepairTimeoutMins = hangingRepairTimeoutMins;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class JmxConnectionFactory {
private EC2MultiRegionAddressTranslator addressTranslator;
private boolean localMode = false;

public JmxProxy connect(Optional<RepairStatusHandler> handler, String host)
public JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int connectionTimeout)
throws ReaperException {
// use configured jmx port for host if provided
if(localMode) {
Expand All @@ -56,14 +56,14 @@ public JmxProxy connect(Optional<RepairStatusHandler> handler, String host)
username = jmxAuth.getUsername();
password = jmxAuth.getPassword();
}
return JmxProxy.connect(handler, host, username, password, addressTranslator);
return JmxProxy.connect(handler, host, username, password, addressTranslator, connectionTimeout);
}

public final JmxProxy connect(String host) throws ReaperException {
return connect(Optional.<RepairStatusHandler>absent(), host);
public final JmxProxy connect(String host, int connectionTimeout) throws ReaperException {
return connect(Optional.<RepairStatusHandler>absent(), host, connectionTimeout);
}

public final JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collection<String> hosts)
public final JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collection<String> hosts, int connectionTimeout)
throws ReaperException {
if (hosts == null || hosts.isEmpty()) {
throw new ReaperException("no hosts given for connectAny");
Expand All @@ -75,7 +75,7 @@ public final JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collecti
while (hostIterator.hasNext()) {
try {
String host = hostIterator.next();
return connect(handler, host);
return connect(handler, host, connectionTimeout);
} catch(Exception e) {
LOG.debug("Unreachable host", e);
}
Expand All @@ -84,13 +84,13 @@ public final JmxProxy connectAny(Optional<RepairStatusHandler> handler, Collecti
throw new ReaperException("no host could be reached through JMX");
}

public final JmxProxy connectAny(Cluster cluster)
public final JmxProxy connectAny(Cluster cluster, int connectionTimeout)
throws ReaperException {
Set<String> hosts = cluster.getSeedHosts();
if (hosts == null || hosts.isEmpty()) {
throw new ReaperException("no seeds in cluster with name: " + cluster.getName());
}
return connectAny(Optional.<RepairStatusHandler>absent(), hosts);
return connectAny(Optional.<RepairStatusHandler>absent(), hosts, connectionTimeout);
}

public void setJmxPorts(Map<String, Integer> jmxPorts) {
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,17 @@ private JmxProxy(Optional<RepairStatusHandler> handler, String host, JMXServiceU
* @see JmxProxy#connect(Optional, String, int, String, String, EC2MultiRegionAddressTranslator)
*/
static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, String username,
String password, final EC2MultiRegionAddressTranslator addressTranslator)
String password, final EC2MultiRegionAddressTranslator addressTranslator, int connectionTimeout)
throws ReaperException {
if(host == null) {
throw new ReaperException("Null host given to JmxProxy.connect()");
}

String[] parts = host.split(":");
if (parts.length == 2) {
return connect(handler, parts[0], Integer.valueOf(parts[1]), username, password, addressTranslator);
return connect(handler, parts[0], Integer.valueOf(parts[1]), username, password, addressTranslator, connectionTimeout);
} else {
return connect(handler, host, JMX_PORT, username, password, addressTranslator);
return connect(handler, host, JMX_PORT, username, password, addressTranslator, connectionTimeout);
}
}

Expand All @@ -157,7 +157,7 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, Stri
* @param addressTranslator if EC2MultiRegionAddressTranslator isn't null it will be used to translate addresses
*/
static JmxProxy connect(Optional<RepairStatusHandler> handler, String originalHost, int port,
String username, String password, final EC2MultiRegionAddressTranslator addressTranslator)
String username, String password, final EC2MultiRegionAddressTranslator addressTranslator, int connectionTimeout)
throws ReaperException {
ObjectName ssMbeanName;
ObjectName cmMbeanName;
Expand Down Expand Up @@ -187,8 +187,7 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String originalHo
env.put(JMXConnector.CREDENTIALS, creds);
}
env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory());

JMXConnector jmxConn = connectWithTimeout(jmxUrl, JMX_CONNECTION_TIMEOUT, JMX_CONNECTION_TIMEOUT_UNIT, env);
JMXConnector jmxConn = connectWithTimeout(jmxUrl, connectionTimeout, TimeUnit.SECONDS, env);
MBeanServerConnection mbeanServerConn = jmxConn.getMBeanServerConnection();
Object ssProxy =
JMX.newMBeanProxy(mbeanServerConn, ssMbeanName, StorageServiceMBean.class);
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/com/spotify/reaper/resources/ClusterResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Cluster createClusterWithSeedHost(String seedHostInput)
Optional<List<String>> liveNodes = Optional.absent();
Set<String> seedHosts = CommonTools.parseSeedHosts(seedHostInput);
for(String seedHost:seedHosts) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seedHost)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seedHost, context.config.getJmxConnectionTimeoutInSeconds())) {
clusterName = Optional.of(jmxProxy.getClusterName());
partitioner = Optional.of(jmxProxy.getPartitioner());
liveNodes = Optional.of(jmxProxy.getLiveNodes());
Expand Down Expand Up @@ -239,7 +239,7 @@ public Response modifyClusterSeed(

if(context.config.getEnableDynamicSeedList()) {
for(String seed:newSeeds) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seed)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seed, context.config.getJmxConnectionTimeoutInSeconds())) {
liveNodes = Optional.of(jmxProxy.getLiveNodes());
newSeeds = liveNodes.get().stream().collect(Collectors.toSet());
break;
Expand Down Expand Up @@ -312,7 +312,7 @@ public Response deleteCluster(
*/
Callable<Optional<NodesStatus>> getEndpointState(String seedHost) {
return () -> {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seedHost)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seedHost, context.config.getJmxConnectionTimeoutInSeconds())) {
Optional<String> allEndpointsState = Optional.fromNullable(jmxProxy.getAllEndpointsState());
Optional<Map<String, String>> simpleStates = Optional.fromNullable(jmxProxy.getSimpleStates());
return Optional.of(new NodesStatus(seedHost, allEndpointsState.or(""), simpleStates.or(new HashMap<String, String>())));
Expand Down Expand Up @@ -348,9 +348,12 @@ public Optional<NodesStatus> getNodesStatus(Optional<Cluster> cluster){
.map(seedHost -> getEndpointState(seedHost))
.collect(Collectors.toList());


try {
nodesStatus = clusterStatusExecutor.invokeAny(endpointStateTasks, JmxProxy.JMX_CONNECTION_TIMEOUT, JmxProxy.JMX_CONNECTION_TIMEOUT_UNIT);
nodesStatus = clusterStatusExecutor.invokeAny(
endpointStateTasks,
JmxProxy.JMX_CONNECTION_TIMEOUT,
JmxProxy.JMX_CONNECTION_TIMEOUT_UNIT);

} catch (Exception e) {
// TODO Auto-generated catch block
LOG.debug("failed grabbing nodes status", e);
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/spotify/reaper/resources/CommonTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private static List<RingRange> generateSegments(AppContext context, Cluster targ
throw new ReaperException(errMsg);
}
for (String host : seedHosts) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds())) {
List<BigInteger> tokens = jmxProxy.getTokens();
segments = sg.generateSegments(segmentCount, tokens, incrementalRepair);
break;
Expand Down Expand Up @@ -198,7 +198,7 @@ private static Map<String, RingRange> getClusterNodes(AppContext context, Clust

Map<List<String>, List<String>> rangeToEndpoint = Maps.newHashMap();
for (String host : seedHosts) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds())) {
rangeToEndpoint = jmxProxy.getRangeToEndpointMap(repairUnit.getKeyspaceName());
break;
} catch (ReaperException e) {
Expand Down Expand Up @@ -282,7 +282,7 @@ public static Set<String> getTableNamesBasedOnParam(
AppContext context, Cluster cluster, String keyspace, Optional<String> tableNamesParam)
throws ReaperException {
Set<String> knownTables;
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connectAny(cluster)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connectAny(cluster, context.config.getJmxConnectionTimeoutInSeconds())) {
knownTables = jmxProxy.getTableNamesForKeyspace(keyspace);
if (knownTables.isEmpty()) {
LOG.debug("no known tables for keyspace {} in cluster {}", keyspace, cluster.getName());
Expand Down Expand Up @@ -312,7 +312,7 @@ public static RepairUnit getNewOrExistingRepairUnit(AppContext context, Cluster

Optional<String> cassandraVersion = Optional.absent();
for (String host : cluster.getSeedHosts()) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host, context.config.getJmxConnectionTimeoutInSeconds())) {
cassandraVersion = Optional.fromNullable(jmxProxy.getCassandraVersion());
break;
} catch (ReaperException e) {
Expand Down
69 changes: 36 additions & 33 deletions src/main/java/com/spotify/reaper/resources/RepairRunResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ public Response modifyRunState(
RepairRun.RunState oldState = repairRun.get().getRunState();

if (oldState == newState) {
return Response.ok("given \"state\" is same as the current run state").build();
return Response
.status(Response.Status.NOT_MODIFIED)
.entity("given \"state\" is same as the current run state")
.build();
}

if (isStarting(oldState, newState)) {
Expand All @@ -319,7 +322,7 @@ public Response modifyRunState(
String errMsg = String.format("Transition %s->%s not supported.", oldState.toString(),
newState.toString());
LOG.error(errMsg);
return Response.status(Response.Status.BAD_REQUEST).entity(errMsg).build();
return Response.status(Response.Status.METHOD_NOT_ALLOWED).entity(errMsg).build();
}
}

Expand Down Expand Up @@ -523,38 +526,38 @@ public Response deleteRepairRun(@PathParam("id") UUID runId,
"required query parameter \"owner\" is missing").build();
}
Optional<RepairRun> runToDelete = context.storage.getRepairRun(runId);
if (!runToDelete.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).entity(
"Repair run with id \"" + runId + "\" not found").build();
}
if (runToDelete.get().getRunState() == RepairRun.RunState.RUNNING) {
return Response.status(Response.Status.FORBIDDEN).entity(
"Repair run with id \"" + runId
+ "\" is currently running, and must be stopped before deleting").build();
}
if (!runToDelete.get().getOwner().equalsIgnoreCase(owner.get())) {
return Response.status(Response.Status.FORBIDDEN).entity(
"Repair run with id \"" + runId + "\" is not owned by the user you defined: "
+ owner.get()).build();
}
if (context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.RUNNING) > 0) {
return Response.status(Response.Status.FORBIDDEN).entity(
"Repair run with id \"" + runId
+ "\" has a running segment, which must be waited to finish before deleting").build();
}
// Need to get the RepairUnit before it's possibly deleted.
Optional<RepairUnit> unitPossiblyDeleted =
context.storage.getRepairUnit(runToDelete.get().getRepairUnitId());
int segmentsRepaired =
context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.DONE);
Optional<RepairRun> deletedRun = context.storage.deleteRepairRun(runId);
if (deletedRun.isPresent()) {
RepairRunStatus repairRunStatus =
new RepairRunStatus(deletedRun.get(), unitPossiblyDeleted.get(), segmentsRepaired);
return Response.ok().entity(repairRunStatus).build();
if (runToDelete.isPresent()) {
if (runToDelete.get().getRunState() == RepairRun.RunState.RUNNING) {
return Response.status(Response.Status.FORBIDDEN).entity(
"Repair run with id \"" + runId
+ "\" is currently running, and must be stopped before deleting").build();
}
if (!runToDelete.get().getOwner().equalsIgnoreCase(owner.get())) {
return Response.status(Response.Status.FORBIDDEN).entity(
"Repair run with id \"" + runId + "\" is not owned by the user you defined: "
+ owner.get()).build();
}
if (context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.RUNNING) > 0) {
return Response.status(Response.Status.FORBIDDEN).entity(
"Repair run with id \"" + runId
+ "\" has a running segment, which must be waited to finish before deleting").build();
}
// Need to get the RepairUnit before it's possibly deleted.
Optional<RepairUnit> unitPossiblyDeleted =
context.storage.getRepairUnit(runToDelete.get().getRepairUnitId());
int segmentsRepaired =
context.storage.getSegmentAmountForRepairRunWithState(runId, RepairSegment.State.DONE);
Optional<RepairRun> deletedRun = context.storage.deleteRepairRun(runId);
if (deletedRun.isPresent()) {
RepairRunStatus repairRunStatus =
new RepairRunStatus(deletedRun.get(), unitPossiblyDeleted.get(), segmentsRepaired);
return Response.ok().entity(repairRunStatus).build();
}
}
return Response.serverError().entity("delete failed for repair run with id \""
+ runId + "\"").build();
return Response
.status(Response.Status.NOT_FOUND)
.entity("Repair run with id \"" + runId + "\" not found")
.build();
}

}
Loading

0 comments on commit ccbb31f

Please sign in to comment.