Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various stuff lying around #505

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions core/src/main/java/org/radargun/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ private Operation(int id, String name) {
this.name = name;
}

// This is useful when running report.sh
private Object readResolve() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems to be unused.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readResolve is Java serialization hook.

if (!byId.containsKey(id)) byId.put(id, this);
if (!byName.containsKey(name)) byName.put(name, this);
return this;
}

/**
* Remove all registered operations. WARNING: Should only be used when no clients access Operations class any longer (e.g. testing purposes)
*/
Expand Down
6 changes: 0 additions & 6 deletions core/src/main/java/org/radargun/RemoteSlaveConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,6 @@ private void startServerSocket() throws IOException {
}
serverSocketChannel.socket().bind(address);
log.info("Master started and listening for connection on: " + address);
log.info("Waiting 5 seconds for server socket to open completely");
try {
Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you removing this? Maybe I should ask why it was there before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the latter is the correct question :) It has been there after some early committer (back at Mircea's days) seemingly resolved some problem. Does not make much sense to me now and I was running happily without.

} catch (InterruptedException ex) {
// ignore
}
}

private int readInt(SocketChannel socketChannel) throws IOException {
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/java/org/radargun/ServiceHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.HashMap;
import java.util.Map;

import org.radargun.config.Cluster;
import org.radargun.config.Definition;
import org.radargun.config.InitHelper;
import org.radargun.config.PropertyHelper;
Expand All @@ -22,6 +23,7 @@ public final class ServiceHelper {

private static String currentPlugin;
private static String currentConfigName;
private static Cluster currentCluster;
private static int currentSlaveIndex;

private ServiceHelper() {}
Expand All @@ -34,23 +36,29 @@ public static String getConfigName() {
return currentConfigName;
}

public static Cluster getCluster() {
return currentCluster;
}

public static int getSlaveIndex() {
return currentSlaveIndex;
}

/**
* As we expect only one service at time to be running on one node, this sets current
* plugin, configuration name and slave index that can be later retrieved, e.g. in some
* plugin, configuration name, cluster info and slave index that can be later retrieved, e.g. in some
* init method (annotated by {@link org.radargun.config.Init}) that would not be able
* to retrieve this information in another way.
*
*
* @param plugin
* @param configName
* @param cluster
* @param slaveIndex
*/
public static void setServiceContext(String plugin, String configName, int slaveIndex) {
public static void setServiceContext(String plugin, String configName, Cluster cluster, int slaveIndex) {
currentPlugin = plugin;
currentConfigName = configName;
currentCluster = cluster;
currentSlaveIndex = slaveIndex;
}

Expand All @@ -61,7 +69,7 @@ public static void setServiceContext(String plugin, String configName, int slave
* Then, sets up all properties declared on the service (and its superclasses).
* Finally calls any methods of the class annotated by {@link org.radargun.config.Init @Init}.
*
* Don't forget to call {@link #setServiceContext(String, String, int)} before calling this method.
* Don't forget to call {@link #setServiceContext(String, String, Cluster, int)} before calling this method.
*/
public static Object createService(String plugin, String service,
Map<String, Definition> properties, Map<String, String> extras) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/radargun/SlaveBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected void scenarioLoop() throws IOException {
state.setService(setup.service);
state.setTimeline(new Timeline(state.getSlaveIndex()));
Map<String, String> extras = getCurrentExtras(configuration, cluster);
ServiceHelper.setServiceContext(setup.plugin, configuration.name, state.getSlaveIndex());
ServiceHelper.setServiceContext(setup.plugin, configuration.name, cluster, state.getSlaveIndex());
Object service = ServiceHelper.createService(setup.plugin, setup.service, setup.getProperties(), extras);
Map<Class<?>, Object> traits = null;
try {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/radargun/config/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public Group(String name, int size) {
this.size = size;
}

@Override
public String toString() {
return name;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/radargun/stages/CommandStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.radargun.config.Property;
import org.radargun.config.Stage;
import org.radargun.utils.ArgsConverter;
import org.radargun.utils.Utils;

/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
Expand Down Expand Up @@ -58,6 +59,10 @@ public DistStageAck executeOnSlave() {
process = startProcess();
if (var != null) {
slaveState.put(var, process);
String pid = Utils.getProcessID(process);
if (pid != null) {
System.setProperty("process." + var + ".id", pid);
}
}
} else if (var != null) {
process = (Process) slaveState.get(var);
Expand Down
38 changes: 20 additions & 18 deletions core/src/main/java/org/radargun/stages/test/TestStage.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.radargun.stages.test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -101,7 +102,7 @@ public DistStageAck executeOnSlave() {
try {
long startNanos = TimeService.nanoTime();
log.info("Starting test " + testName);
stressorsManager = setUpAndStartStressors();
setUpAndStartStressors();
waitForStressorsToFinish(stressorsManager);
destroy();
log.info("Finished test. Test duration is: " + Utils.getNanosDurationString(TimeService.nanoTime() - startNanos));
Expand Down Expand Up @@ -160,23 +161,23 @@ protected StageResult processAckOnMaster(List<DistStageAck> acks, String testNam
}
}

protected StressorsManager setUpAndStartStressors() {
protected void setUpAndStartStressors() {
long startTime = TimeService.currentTimeMillis();
completion = createCompletion();
CountDownLatch finishCountDown = new CountDownLatch(1);
completion.setCompletionHandler(new Runnable() {
@Override
public void run() {
//Stop collecting statistics for duration-based tests
if (duration > 0) {
finished = true;
}
finishCountDown.countDown();
CountDownLatch startCountDown = new CountDownLatch(getNumThreadsOn(slaveState.getSlaveIndex()));
completion.setCompletionHandler(() -> {
//Stop collecting statistics for duration-based tests
if (duration > 0) {
finished = true;
}
finishCountDown.countDown();
});
operationSelector = wrapOperationSelector(createOperationSelector());

List<Stressor> stressors = startStressors();
List<Stressor> stressors = createStressors(startCountDown);
stressorsManager = new StressorsManager(stressors, startTime, finishCountDown);
startStressors(stressors, startCountDown);
started = true;

if (rampUp > 0) {
Expand All @@ -186,7 +187,6 @@ public void run() {
throw new IllegalStateException("Interrupted during ramp-up.", e);
}
}
return new StressorsManager(stressors, startTime, finishCountDown);
}

protected void waitForStressorsToFinish(StressorsManager manager) {
Expand Down Expand Up @@ -245,24 +245,26 @@ protected OperationSelector wrapOperationSelector(OperationSelector operationSel
return operationSelector;
}

protected List<Stressor> startStressors() {
protected List<Stressor> createStressors(CountDownLatch startCountDown) {
int myFirstThread = getFirstThreadOn(slaveState.getSlaveIndex());
int myNumThreads = getNumThreadsOn(slaveState.getSlaveIndex());
CountDownLatch threadCountDown = new CountDownLatch(myNumThreads);

List<Stressor> stressors = new ArrayList<>();
for (int threadIndex = stressors.size(); threadIndex < myNumThreads; threadIndex++) {
Stressor stressor = new Stressor(this, getLogic(), myFirstThread + threadIndex, threadIndex, logTransactionExceptions, threadCountDown, delayBetweenRequests);
Stressor stressor = new Stressor(this, getLogic(), myFirstThread + threadIndex, threadIndex, logTransactionExceptions, startCountDown, delayBetweenRequests);
stressors.add(stressor);
stressor.start();
}
return stressors;
}

protected void startStressors(Collection<Stressor> stressors, CountDownLatch startCountDown) {
stressors.forEach(Stressor::start);
try {
threadCountDown.await();
startCountDown.await();
} catch (InterruptedException e) {
//FIXME implement me
}
log.info("Started " + stressors.size() + " stressor threads.");
return stressors;
}

protected DistStageAck newStatisticsAck(List<Stressor> stressors) {
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/radargun/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,25 @@ public static SlaveConnectionInfo getSlaveConnectionInfo(int slaveIndex) throws
return connection;
}

public static String getProcessID(Process process) {
Class<?> clazz = process.getClass();
try {
if (clazz.getName().equals("java.lang.UNIXProcess")) {
Field pidField = clazz.getDeclaredField("pid");
pidField.setAccessible(true);
Object value = pidField.get(process);
if (value instanceof Integer) {
return String.valueOf(value);
}
} else {
throw new IllegalArgumentException("Only unix is supported as OS.");
}
} catch (Exception ex) {
log.errorf(ex, "Failure retrieving PID");
}
return null;
}

public static class JarFilenameFilter implements FilenameFilter {
public boolean accept(File dir, String name) {
String fileName = name.toUpperCase(Locale.ENGLISH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class BasicOperationsTestStage extends CacheOperationsTestStage {
@Property(doc = "Ratio of GET_AND_REMOVE requests. Default is 0.")
protected int getAndRemoveRatio = 0;

@InjectTrait
@InjectTrait(dependency = InjectTrait.Dependency.MANDATORY)
protected BasicOperations basicOperations;

@Init
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public DistStageAck executeOnSlave() {
}
try {
log.info("Starting test " + testName + " in the background.");
stressorsManager = setUpAndStartStressors();
setUpAndStartStressors();
slaveState.put(testName, this);
return successfulResponse();
} catch (Exception e) {
Expand Down