From 471da10810d6d662e14a71f6614955057697855e Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Tue, 13 Dec 2016 18:22:13 +0100 Subject: [PATCH] Add REST stages that run in background * useful for resilience tests where we need to kill some servers from stages running in foreground --- .../radargun/stages/test/BaseTestStage.java | 4 + .../stages/test/legacy/LegacyTestStage.java | 45 ++++++----- .../stages/test/legacy/StressorsManager.java | 36 +++++++++ .../test/legacy/TimeStressorCompletion.java | 4 +- .../BackgroundRESTOperationsStartStage.java | 76 +++++++++++++++++++ .../BackgroundRESTOperationsStopStage.java | 73 ++++++++++++++++++ 6 files changed, 219 insertions(+), 19 deletions(-) create mode 100644 core/src/main/java/org/radargun/stages/test/legacy/StressorsManager.java create mode 100644 extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStartStage.java create mode 100644 extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStopStage.java diff --git a/core/src/main/java/org/radargun/stages/test/BaseTestStage.java b/core/src/main/java/org/radargun/stages/test/BaseTestStage.java index eed8a5015..f27a7fd29 100644 --- a/core/src/main/java/org/radargun/stages/test/BaseTestStage.java +++ b/core/src/main/java/org/radargun/stages/test/BaseTestStage.java @@ -61,6 +61,10 @@ public void check() { } protected Report.Test getTest(boolean allowExisting) { + return getTest(allowExisting, testName); + } + + protected Report.Test getTest(boolean allowExisting, String testName) { if (testName == null || testName.isEmpty()) { log.warn("No test name - results are not recorded"); return null; diff --git a/core/src/main/java/org/radargun/stages/test/legacy/LegacyTestStage.java b/core/src/main/java/org/radargun/stages/test/legacy/LegacyTestStage.java index c5264810c..a34abde9d 100644 --- a/core/src/main/java/org/radargun/stages/test/legacy/LegacyTestStage.java +++ b/core/src/main/java/org/radargun/stages/test/legacy/LegacyTestStage.java @@ -70,7 +70,6 @@ public abstract class LegacyTestStage extends BaseTestStage { @InjectTrait protected Transactional transactional; - private CountDownLatch finishCountDown; private Completion completion; private OperationSelector operationSelector; @@ -78,6 +77,12 @@ public abstract class LegacyTestStage extends BaseTestStage { protected volatile boolean finished = false; protected volatile boolean terminated = false; + protected StressorsManager stressorsManager; + + public StressorsManager getStressorsManager() { + return stressorsManager; + } + @Init public void init() { if (totalThreads <= 0 && numThreadsPerNode <= 0) @@ -96,10 +101,11 @@ public DistStageAck executeOnSlave() { try { long startNanos = TimeService.nanoTime(); log.info("Starting test " + testName); - List stressors = execute(); + stressorsManager = setUpAndStartStressors(); + waitForStressorsToFinish(stressorsManager); destroy(); log.info("Finished test. Test duration is: " + Utils.getNanosDurationString(TimeService.nanoTime() - startNanos)); - return newStatisticsAck(stressors); + return newStatisticsAck(stressorsManager.getStressors()); } catch (Exception e) { return errorResponse("Exception while initializing the test", e); } @@ -118,10 +124,14 @@ protected void destroy() { } public StageResult processAckOnMaster(List acks) { + return processAckOnMaster(acks, testName); + } + + protected StageResult processAckOnMaster(List acks, String testNameOverride) { StageResult result = super.processAckOnMaster(acks); if (result.isError()) return result; - Report.Test test = getTest(amendTest); + Report.Test test = getTest(amendTest, testNameOverride); testIteration = test == null ? 0 : test.getIterations().size(); // we cannot use aggregated = createStatistics() since with PeriodicStatistics the merge would fail List statisticsAcks = instancesOf(acks, StatisticsAck.class); @@ -147,10 +157,10 @@ public StageResult processAckOnMaster(List acks) { } } - public List execute() { + protected StressorsManager setUpAndStartStressors() { long startTime = TimeService.currentTimeMillis(); completion = createCompletion(); - finishCountDown = new CountDownLatch(1); + CountDownLatch finishCountDown = new CountDownLatch(1); completion.setCompletionHandler(new Runnable() { @Override public void run() { @@ -164,6 +174,7 @@ public void run() { operationSelector = wrapOperationSelector(createOperationSelector()); List stressors = startStressors(); + started = true; if (rampUp > 0) { try { @@ -172,29 +183,30 @@ public void run() { throw new IllegalStateException("Interrupted during ramp-up.", e); } } + return new StressorsManager(stressors, startTime, finishCountDown); + } - started = true; - + protected void waitForStressorsToFinish(StressorsManager manager) { try { if (timeout > 0) { - long waitTime = getWaitTime(startTime); + long waitTime = getWaitTime(manager.getStartTime()); if (waitTime <= 0) { throw new TestTimeoutException(); } else { - if (!finishCountDown.await(waitTime, TimeUnit.MILLISECONDS)) { + if (!manager.getFinishCountDown().await(waitTime, TimeUnit.MILLISECONDS)) { throw new TestTimeoutException(); } } } else { - finishCountDown.await(); + manager.getFinishCountDown().await(); } } catch (InterruptedException e) { throw new IllegalStateException("Unexpected interruption", e); } - for (Thread stressorThread : stressors) { + for (Thread stressorThread : manager.getStressors()) { try { if (timeout > 0) { - long waitTime = getWaitTime(startTime); + long waitTime = getWaitTime(manager.getStartTime()); if (waitTime <= 0) throw new TestTimeoutException(); stressorThread.join(waitTime); } else { @@ -204,7 +216,6 @@ public void run() { throw new TestTimeoutException(e); } } - return stressors; } protected Completion createCompletion() { @@ -270,7 +281,7 @@ protected List gatherResults(List stressors, ResultRetrie } } - private long getWaitTime(long startTime) { + protected long getWaitTime(long startTime) { return startTime + timeout - TimeService.currentTimeMillis(); } @@ -324,7 +335,7 @@ public boolean isTerminated() { public void setTerminated() { terminated = true; - finishCountDown.countDown(); + stressorsManager.getFinishCountDown().countDown(); } public Completion getCompletion() { @@ -365,7 +376,7 @@ public Statistics merge(Statistics stats1, Statistics stats2) { } } - private class TestTimeoutException extends RuntimeException { + protected class TestTimeoutException extends RuntimeException { public TestTimeoutException() { } diff --git a/core/src/main/java/org/radargun/stages/test/legacy/StressorsManager.java b/core/src/main/java/org/radargun/stages/test/legacy/StressorsManager.java new file mode 100644 index 000000000..b03ff9f16 --- /dev/null +++ b/core/src/main/java/org/radargun/stages/test/legacy/StressorsManager.java @@ -0,0 +1,36 @@ +package org.radargun.stages.test.legacy; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * A class that holds additional information about stressors and makes is possible to + * pass stressors together with this information between stages. An arbitrary stage can wait + * for the stressors to finish and track their execution time. + * + * @author Martin Gencur + */ +public class StressorsManager { + + private CountDownLatch finishCountDown; + private long startTime; + private List stressors; + + public StressorsManager(List stressors, long startTime, CountDownLatch finishCountDown) { + this.stressors = stressors; + this.startTime = startTime; + this.finishCountDown = finishCountDown; + } + + public long getStartTime() { + return startTime; + } + + public List getStressors() { + return stressors; + } + + public CountDownLatch getFinishCountDown() { + return finishCountDown; + } +} diff --git a/core/src/main/java/org/radargun/stages/test/legacy/TimeStressorCompletion.java b/core/src/main/java/org/radargun/stages/test/legacy/TimeStressorCompletion.java index 87e5bd33c..7515b14d6 100644 --- a/core/src/main/java/org/radargun/stages/test/legacy/TimeStressorCompletion.java +++ b/core/src/main/java/org/radargun/stages/test/legacy/TimeStressorCompletion.java @@ -18,7 +18,7 @@ public class TimeStressorCompletion extends AbstractCompletion { private final long logFrequency = TimeUnit.SECONDS.toNanos(20); /** - * @param duration Duration of the test in nanoseconds. + * @param duration Duration of the test in nanoseconds. When duration is 0 the test will run indefinitely. */ public TimeStressorCompletion(long duration) { this.duration = TimeUnit.MILLISECONDS.toNanos(duration); @@ -26,7 +26,7 @@ public TimeStressorCompletion(long duration) { @Override public boolean moreToRun() { - boolean moreToRun = TimeService.nanoTime() < startTime + duration; + boolean moreToRun = duration == 0 ? true : TimeService.nanoTime() < startTime + duration; if (!moreToRun) { runCompletionHandler(); } diff --git a/extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStartStage.java b/extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStartStage.java new file mode 100644 index 000000000..f1099e079 --- /dev/null +++ b/extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStartStage.java @@ -0,0 +1,76 @@ +package org.radargun.stages; + +import java.util.List; +import org.radargun.DistStageAck; +import org.radargun.StageResult; +import org.radargun.config.Namespace; +import org.radargun.config.Stage; +import org.radargun.stages.test.legacy.Completion; +import org.radargun.stages.test.legacy.LegacyTestStage; +import org.radargun.stages.test.legacy.TimeStressorCompletion; + +/** + * A stage for REST operations running in background. + * + * @author Martin Gencur + */ +@Namespace(LegacyTestStage.NAMESPACE) +@Stage(doc = "Stage for starting REST operations in the background") +public class BackgroundRESTOperationsStartStage extends RESTOperationsTestStage { + + //Override Init method from BaseTestStage in order to handle duration specifically + @Override + public void check() { + if (duration > 0 || duration < 0) { + duration = 0; + log.warn("Parameter duration ignored in background stage. Stage will run indefinitely until " + + "it is manually stopped from " + BackgroundRESTOperationsStopStage.class.getSimpleName()); + } + if (numOperations > 0) { + numOperations = 0; + log.warn("Parameter numOperations ignored in background stage."); + } + if (timeout > 0 || timeout < 0) { + timeout = 0; + log.warn("Parameter timeout ignored in background stage."); + } + } + + @Override + protected Completion createCompletion() { + return new TimeStressorCompletion(duration); + } + + @Override + public DistStageAck executeOnSlave() { + if (!isServiceRunning()) { + log.info("Not running test on this slave as service is not running."); + return successfulResponse(); + } + try { + log.info("Starting test " + testName + " in the background."); + stressorsManager = setUpAndStartStressors(); + slaveState.put(testName, this); + return successfulResponse(); + } catch (Exception e) { + return errorResponse("Exception while initializing the test", e); + } + } + + @Override + public StageResult processAckOnMaster(List acks) { + StageResult result = StageResult.SUCCESS; + logDurationInfo(acks); + for (DistStageAck ack : acks) { + if (ack.isError()) { + log.warn("Received error ack " + ack); + result = errorResult(); + } else { + if (log.isTraceEnabled()) { + log.trace("Received success ack " + ack); + } + } + } + return result; + } +} diff --git a/extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStopStage.java b/extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStopStage.java new file mode 100644 index 000000000..d5a27dcc9 --- /dev/null +++ b/extensions/rest/src/main/java/org/radargun/stages/BackgroundRESTOperationsStopStage.java @@ -0,0 +1,73 @@ +package org.radargun.stages; + +import java.util.List; +import org.radargun.DistStageAck; +import org.radargun.StageResult; +import org.radargun.config.Namespace; +import org.radargun.config.Property; +import org.radargun.config.Stage; +import org.radargun.stages.test.legacy.LegacyTestStage; +import org.radargun.utils.TimeService; +import org.radargun.utils.Utils; + +/** + * A test stage for stopping REST operations background stage. + * + * @author Martin Gencur + */ +@Namespace(LegacyTestStage.NAMESPACE) +@Stage(doc = "Stage for stopping REST operations running in the background") +public class BackgroundRESTOperationsStopStage extends RESTOperationsTestStage { + + @Property(doc = "Name of the background operations to be stopped. Default is 'Test'.") + protected String testNameToStop = "Test"; + + @Override + public void check() { + if (duration > 0 || duration < 0) { + duration = 0; + log.warn("Parameter duration ignored in background stage."); + } + if (numOperations > 0) { + numOperations = 0; + log.warn("Parameter numOperations ignored in background stage."); + } + if (timeout > 0 || timeout < 0) { + timeout = 0; + log.warn("Parameter timeout ignored in background stage."); + } + } + + @Override + public void init() { + //do not check any parameters + } + + @Override + public DistStageAck executeOnSlave() { + if (!isServiceRunning()) { + log.info("Not running test on this slave as service is not running."); + return successfulResponse(); + } + try { + BackgroundRESTOperationsStartStage startedStage = (BackgroundRESTOperationsStartStage) slaveState.get(testNameToStop); + if (startedStage == null) { + throw new RuntimeException("Unable to find the test in slaveState: " + testNameToStop); + } + log.info("Stopping test " + startedStage.testName + " running in the background."); + + startedStage.setTerminated(); + + waitForStressorsToFinish(startedStage.getStressorsManager()); + log.info("Finished test. Test duration is: " + Utils.getMillisDurationString(TimeService.currentTimeMillis() - startedStage.getStressorsManager().getStartTime())); + return newStatisticsAck(startedStage.getStressorsManager().getStressors()); + } catch (Exception e) { + return errorResponse("Exception while initializing the test", e); + } + } + + @Override + public StageResult processAckOnMaster(List acks) { + return processAckOnMaster(acks, testNameToStop); + } +}