Skip to content

Commit

Permalink
Add REST stages that run in background
Browse files Browse the repository at this point in the history
* useful for resilience tests where we need to kill some servers from stages running in foreground
  • Loading branch information
mgencur authored and jmarkos committed Jan 18, 2017
1 parent 681fd95 commit 471da10
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 19 deletions.
Expand Up @@ -61,6 +61,10 @@ public void check() {
}

protected Report.Test getTest(boolean allowExisting) {
return getTest(allowExisting, testName);
}

protected Report.Test getTest(boolean allowExisting, String testName) {
if (testName == null || testName.isEmpty()) {
log.warn("No test name - results are not recorded");
return null;
Expand Down
Expand Up @@ -70,14 +70,19 @@ public abstract class LegacyTestStage extends BaseTestStage {
@InjectTrait
protected Transactional transactional;

private CountDownLatch finishCountDown;
private Completion completion;
private OperationSelector operationSelector;

protected volatile boolean started = false;
protected volatile boolean finished = false;
protected volatile boolean terminated = false;

protected StressorsManager stressorsManager;

public StressorsManager getStressorsManager() {
return stressorsManager;
}

@Init
public void init() {
if (totalThreads <= 0 && numThreadsPerNode <= 0)
Expand All @@ -96,10 +101,11 @@ public DistStageAck executeOnSlave() {
try {
long startNanos = TimeService.nanoTime();
log.info("Starting test " + testName);
List<LegacyStressor> stressors = execute();
stressorsManager = setUpAndStartStressors();
waitForStressorsToFinish(stressorsManager);
destroy();
log.info("Finished test. Test duration is: " + Utils.getNanosDurationString(TimeService.nanoTime() - startNanos));
return newStatisticsAck(stressors);
return newStatisticsAck(stressorsManager.getStressors());
} catch (Exception e) {
return errorResponse("Exception while initializing the test", e);
}
Expand All @@ -118,10 +124,14 @@ protected void destroy() {
}

public StageResult processAckOnMaster(List<DistStageAck> acks) {
return processAckOnMaster(acks, testName);
}

protected StageResult processAckOnMaster(List<DistStageAck> acks, String testNameOverride) {
StageResult result = super.processAckOnMaster(acks);
if (result.isError()) return result;

Report.Test test = getTest(amendTest);
Report.Test test = getTest(amendTest, testNameOverride);
testIteration = test == null ? 0 : test.getIterations().size();
// we cannot use aggregated = createStatistics() since with PeriodicStatistics the merge would fail
List<StatisticsAck> statisticsAcks = instancesOf(acks, StatisticsAck.class);
Expand All @@ -147,10 +157,10 @@ public StageResult processAckOnMaster(List<DistStageAck> acks) {
}
}

public List<LegacyStressor> execute() {
protected StressorsManager setUpAndStartStressors() {
long startTime = TimeService.currentTimeMillis();
completion = createCompletion();
finishCountDown = new CountDownLatch(1);
CountDownLatch finishCountDown = new CountDownLatch(1);
completion.setCompletionHandler(new Runnable() {
@Override
public void run() {
Expand All @@ -164,6 +174,7 @@ public void run() {
operationSelector = wrapOperationSelector(createOperationSelector());

List<LegacyStressor> stressors = startStressors();
started = true;

if (rampUp > 0) {
try {
Expand All @@ -172,29 +183,30 @@ public void run() {
throw new IllegalStateException("Interrupted during ramp-up.", e);
}
}
return new StressorsManager(stressors, startTime, finishCountDown);
}

started = true;

protected void waitForStressorsToFinish(StressorsManager manager) {
try {
if (timeout > 0) {
long waitTime = getWaitTime(startTime);
long waitTime = getWaitTime(manager.getStartTime());
if (waitTime <= 0) {
throw new TestTimeoutException();
} else {
if (!finishCountDown.await(waitTime, TimeUnit.MILLISECONDS)) {
if (!manager.getFinishCountDown().await(waitTime, TimeUnit.MILLISECONDS)) {
throw new TestTimeoutException();
}
}
} else {
finishCountDown.await();
manager.getFinishCountDown().await();
}
} catch (InterruptedException e) {
throw new IllegalStateException("Unexpected interruption", e);
}
for (Thread stressorThread : stressors) {
for (Thread stressorThread : manager.getStressors()) {
try {
if (timeout > 0) {
long waitTime = getWaitTime(startTime);
long waitTime = getWaitTime(manager.getStartTime());
if (waitTime <= 0) throw new TestTimeoutException();
stressorThread.join(waitTime);
} else {
Expand All @@ -204,7 +216,6 @@ public void run() {
throw new TestTimeoutException(e);
}
}
return stressors;
}

protected Completion createCompletion() {
Expand Down Expand Up @@ -270,7 +281,7 @@ protected <T> List<T> gatherResults(List<LegacyStressor> stressors, ResultRetrie
}
}

private long getWaitTime(long startTime) {
protected long getWaitTime(long startTime) {
return startTime + timeout - TimeService.currentTimeMillis();
}

Expand Down Expand Up @@ -324,7 +335,7 @@ public boolean isTerminated() {

public void setTerminated() {
terminated = true;
finishCountDown.countDown();
stressorsManager.getFinishCountDown().countDown();
}

public Completion getCompletion() {
Expand Down Expand Up @@ -365,7 +376,7 @@ public Statistics merge(Statistics stats1, Statistics stats2) {
}
}

private class TestTimeoutException extends RuntimeException {
protected class TestTimeoutException extends RuntimeException {
public TestTimeoutException() {
}

Expand Down
@@ -0,0 +1,36 @@
package org.radargun.stages.test.legacy;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* A class that holds additional information about stressors and makes is possible to
* pass stressors together with this information between stages. An arbitrary stage can wait
* for the stressors to finish and track their execution time.
*
* @author Martin Gencur
*/
public class StressorsManager {

private CountDownLatch finishCountDown;
private long startTime;
private List<LegacyStressor> stressors;

public StressorsManager(List<LegacyStressor> stressors, long startTime, CountDownLatch finishCountDown) {
this.stressors = stressors;
this.startTime = startTime;
this.finishCountDown = finishCountDown;
}

public long getStartTime() {
return startTime;
}

public List<LegacyStressor> getStressors() {
return stressors;
}

public CountDownLatch getFinishCountDown() {
return finishCountDown;
}
}
Expand Up @@ -18,15 +18,15 @@ public class TimeStressorCompletion extends AbstractCompletion {
private final long logFrequency = TimeUnit.SECONDS.toNanos(20);

/**
* @param duration Duration of the test in nanoseconds.
* @param duration Duration of the test in nanoseconds. When duration is 0 the test will run indefinitely.
*/
public TimeStressorCompletion(long duration) {
this.duration = TimeUnit.MILLISECONDS.toNanos(duration);
}

@Override
public boolean moreToRun() {
boolean moreToRun = TimeService.nanoTime() < startTime + duration;
boolean moreToRun = duration == 0 ? true : TimeService.nanoTime() < startTime + duration;
if (!moreToRun) {
runCompletionHandler();
}
Expand Down
@@ -0,0 +1,76 @@
package org.radargun.stages;

import java.util.List;
import org.radargun.DistStageAck;
import org.radargun.StageResult;
import org.radargun.config.Namespace;
import org.radargun.config.Stage;
import org.radargun.stages.test.legacy.Completion;
import org.radargun.stages.test.legacy.LegacyTestStage;
import org.radargun.stages.test.legacy.TimeStressorCompletion;

/**
* A stage for REST operations running in background.
*
* @author Martin Gencur
*/
@Namespace(LegacyTestStage.NAMESPACE)
@Stage(doc = "Stage for starting REST operations in the background")
public class BackgroundRESTOperationsStartStage extends RESTOperationsTestStage {

//Override Init method from BaseTestStage in order to handle duration specifically
@Override
public void check() {
if (duration > 0 || duration < 0) {
duration = 0;
log.warn("Parameter duration ignored in background stage. Stage will run indefinitely until " +
"it is manually stopped from " + BackgroundRESTOperationsStopStage.class.getSimpleName());
}
if (numOperations > 0) {
numOperations = 0;
log.warn("Parameter numOperations ignored in background stage.");
}
if (timeout > 0 || timeout < 0) {
timeout = 0;
log.warn("Parameter timeout ignored in background stage.");
}
}

@Override
protected Completion createCompletion() {
return new TimeStressorCompletion(duration);
}

@Override
public DistStageAck executeOnSlave() {
if (!isServiceRunning()) {
log.info("Not running test on this slave as service is not running.");
return successfulResponse();
}
try {
log.info("Starting test " + testName + " in the background.");
stressorsManager = setUpAndStartStressors();
slaveState.put(testName, this);
return successfulResponse();
} catch (Exception e) {
return errorResponse("Exception while initializing the test", e);
}
}

@Override
public StageResult processAckOnMaster(List<DistStageAck> acks) {
StageResult result = StageResult.SUCCESS;
logDurationInfo(acks);
for (DistStageAck ack : acks) {
if (ack.isError()) {
log.warn("Received error ack " + ack);
result = errorResult();
} else {
if (log.isTraceEnabled()) {
log.trace("Received success ack " + ack);
}
}
}
return result;
}
}
@@ -0,0 +1,73 @@
package org.radargun.stages;

import java.util.List;
import org.radargun.DistStageAck;
import org.radargun.StageResult;
import org.radargun.config.Namespace;
import org.radargun.config.Property;
import org.radargun.config.Stage;
import org.radargun.stages.test.legacy.LegacyTestStage;
import org.radargun.utils.TimeService;
import org.radargun.utils.Utils;

/**
* A test stage for stopping REST operations background stage.
*
* @author Martin Gencur
*/
@Namespace(LegacyTestStage.NAMESPACE)
@Stage(doc = "Stage for stopping REST operations running in the background")
public class BackgroundRESTOperationsStopStage extends RESTOperationsTestStage {

@Property(doc = "Name of the background operations to be stopped. Default is 'Test'.")
protected String testNameToStop = "Test";

@Override
public void check() {
if (duration > 0 || duration < 0) {
duration = 0;
log.warn("Parameter duration ignored in background stage.");
}
if (numOperations > 0) {
numOperations = 0;
log.warn("Parameter numOperations ignored in background stage.");
}
if (timeout > 0 || timeout < 0) {
timeout = 0;
log.warn("Parameter timeout ignored in background stage.");
}
}

@Override
public void init() {
//do not check any parameters
}

@Override
public DistStageAck executeOnSlave() {
if (!isServiceRunning()) {
log.info("Not running test on this slave as service is not running.");
return successfulResponse();
}
try {
BackgroundRESTOperationsStartStage startedStage = (BackgroundRESTOperationsStartStage) slaveState.get(testNameToStop);
if (startedStage == null) {
throw new RuntimeException("Unable to find the test in slaveState: " + testNameToStop);
}
log.info("Stopping test " + startedStage.testName + " running in the background.");

startedStage.setTerminated();

waitForStressorsToFinish(startedStage.getStressorsManager());
log.info("Finished test. Test duration is: " + Utils.getMillisDurationString(TimeService.currentTimeMillis() - startedStage.getStressorsManager().getStartTime()));
return newStatisticsAck(startedStage.getStressorsManager().getStressors());
} catch (Exception e) {
return errorResponse("Exception while initializing the test", e);
}
}

@Override
public StageResult processAckOnMaster(List<DistStageAck> acks) {
return processAckOnMaster(acks, testNameToStop);
}
}

0 comments on commit 471da10

Please sign in to comment.