Skip to content

Commit

Permalink
gh-449 Enable getting slave addresses via properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgencur authored and rvansa committed Jan 13, 2017
1 parent 750c92d commit 681fd95
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 12 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/org/radargun/Master.java
Expand Up @@ -57,6 +57,7 @@ public void run() throws Exception {
try {
connection = new RemoteSlaveConnection(masterConfig.getMaxClusterSize(), masterConfig.getHost(), masterConfig.getPort());
connection.establish();
connection.receiveSlaveAddresses();
state.setMaxClusterSize(masterConfig.getMaxClusterSize());
// let's create reporters now to fail soon in case of misconfiguration
ArrayList<Reporter> reporters = new ArrayList<>();
Expand All @@ -78,10 +79,12 @@ public void run() throws Exception {
int clusterSize = cluster.getSize();
log.info("Starting scenario on " + cluster);
connection.sendCluster(cluster);
connection.sendSlaveAddresses();
connection.sendConfiguration(configuration);
// here we should restart, therefore, we have to send it again
connection.restartSlaves(clusterSize);
connection.sendCluster(cluster);
connection.sendSlaveAddresses();
connection.sendConfiguration(configuration);
connection.sendScenario(masterConfig.getScenario(), clusterSize);
state.setCluster(cluster);
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/java/org/radargun/RemoteMasterConnection.java
Expand Up @@ -79,7 +79,6 @@ public InetAddress connectToMaster(int slaveIndex) throws IOException {
}
buffer.flip();
while (buffer.hasRemaining()) socketChannel.write(buffer);

return socketChannel.socket().getLocalAddress();
}

Expand Down Expand Up @@ -152,24 +151,24 @@ public Object receiveObject() throws IOException {

/**
* Send any serializable object to the master node.
* @param response
* @param obj
* @param nextUuid UUID of the next generation of slaves, or null if this slave will continue
* @throws IOException
*/
public void sendResponse(Serializable response, UUID nextUuid) throws IOException {
public void sendObject(Serializable obj, UUID nextUuid) throws IOException {
buffer.clear();
buffer = SerializationHelper.serializeObjectWithLength(response, buffer);
buffer = SerializationHelper.serializeObjectWithLength(obj, buffer);
if (nextUuid == null) {
buffer = SerializationHelper.appendLong(0, buffer);
buffer = SerializationHelper.appendLong(0, buffer);
} else {
buffer = SerializationHelper.appendLong(nextUuid.getMostSignificantBits(), buffer);
buffer = SerializationHelper.appendLong(nextUuid.getLeastSignificantBits(), buffer);
}
log.trace("Sending response to the master, response has " + buffer.position() + " bytes.");
log.trace("Sending a message to the master, message has " + buffer.position() + " bytes.");
buffer.flip();
while (buffer.hasRemaining()) socketChannel.write(buffer);
log.info("Response successfully sent to the master");
log.info("Message successfully sent to the master");
}

public void release() throws IOException {
Expand Down
44 changes: 41 additions & 3 deletions core/src/main/java/org/radargun/RemoteSlaveConnection.java
Expand Up @@ -24,6 +24,7 @@
import org.radargun.logging.Log;
import org.radargun.logging.LogFactory;
import org.radargun.reporting.Timeline;
import org.radargun.utils.SlaveConnectionInfo;
import org.radargun.utils.TimeService;

/**
Expand All @@ -45,6 +46,7 @@ public class RemoteSlaveConnection {

private ServerSocketChannel serverSocketChannel;
private SlaveRecord[] slaves;
private SlaveAddresses slaveAddresses;

private ByteBuffer mcastBuffer;
private Map<SocketChannel, ByteBuffer> writeBufferMap = new HashMap<SocketChannel, ByteBuffer>();
Expand All @@ -59,28 +61,49 @@ public class RemoteSlaveConnection {
private int port;

private static class SlaveRecord {
private int index; // slave id, should be equal to index in slaves field
private UUID uuid; // key unique for given series of generations of this slave
private SocketChannel channel;

public SlaveRecord(int index, UUID uuid, SocketChannel channel) {
this.index = index;
this.uuid = uuid;
this.channel = channel;
}
}

/**
* Holds information about interfaces and IP addresses of individual slaves.
* This information is collected from individual slaves and re-distributed to the other
* slaves in the cluster.
*/
public static class SlaveAddresses implements Serializable {
public Map<Integer, SlaveConnectionInfo> slaveConnections;

public SlaveAddresses() {
this.slaveConnections = new HashMap<>();
}

public void addSlaveAddresses(int index, SlaveConnectionInfo connectionInfo) {
slaveConnections.put(index, connectionInfo);
}

public SlaveConnectionInfo getSlaveAddresses(int index) {
return slaveConnections.get(index);
}
}

public RemoteSlaveConnection(int numSlaves, String host, int port) throws IOException {
this.host = host;
this.port = port > 0 && port < 65536 ? port : DEFAULT_PORT;
slaves = new SlaveRecord[numSlaves];
slaveAddresses = new SlaveAddresses();
communicationSelector = Selector.open();
startServerSocket();
}

public void establish() throws IOException {
discoverySelector = Selector.open();
serverSocketChannel.register(discoverySelector, SelectionKey.OP_ACCEPT);
mcastBuffer = ByteBuffer.allocate(DEFAULT_WRITE_BUFF_CAPACITY);
int slaveCount = 0;
long deadline = TimeService.currentTimeMillis() + CONNECT_TIMEOUT;
while (slaveCount < slaves.length) {
Expand All @@ -91,7 +114,6 @@ public void establish() throws IOException {
log.info("Awaiting registration from " + (slaves.length - slaveCount) + " slaves.");
slaveCount += connectSlaves(timeout);
}
mcastBuffer = ByteBuffer.allocate(DEFAULT_WRITE_BUFF_CAPACITY);
log.info("Connection established from " + slaveCount + " slaves.");
}

Expand Down Expand Up @@ -142,6 +164,7 @@ private int connectSlaves(long timeout) throws IOException {
}
writeInt(socketChannel, slaveIndex);
writeInt(socketChannel, slaves.length);

slaveCount++;
channel2Index.put(socketChannel, slaveIndex);
readBufferMap.put(socketChannel, ByteBuffer.allocate(DEFAULT_READ_BUFF_CAPACITY));
Expand Down Expand Up @@ -209,6 +232,21 @@ public List<Timeline> receiveTimelines(int numSlaves) throws IOException {
return Arrays.asList(responses.toArray(new Timeline[numSlaves]));
}

public void receiveSlaveAddresses() throws IOException {
responses.clear();
mcastObject(new SlaveConnectionInfo.Request(), slaves.length);
flushBuffers(slaves.length);
List<SlaveConnectionInfo> connections = Arrays.asList(responses.toArray(new SlaveConnectionInfo[slaves.length]));
for (SlaveConnectionInfo connectionInfo : connections) {
slaveAddresses.addSlaveAddresses(connectionInfo.getSlaveIndex(), connectionInfo);
}
}

public void sendSlaveAddresses() throws IOException {
mcastObject(slaveAddresses, slaves.length);
flushBuffers(0);
}

private void flushBuffers(int numResponses) throws IOException {
while (!writeBufferMap.isEmpty() || responses.size() < numResponses) {
communicationSelector.select();
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/java/org/radargun/Slave.java
Expand Up @@ -16,6 +16,7 @@
import org.radargun.reporting.Timeline;
import org.radargun.utils.ArgsHolder;
import org.radargun.utils.RestartHelper;
import org.radargun.utils.SlaveConnectionInfo;
import org.radargun.utils.Utils;

/**
Expand Down Expand Up @@ -57,7 +58,7 @@ private void run(int slaveIndex) throws Exception {
envs.put(entry.getKey(), Evaluator.parseString(entry.getValue().toString()));
}
RestartHelper.spawnSlave(state.getSlaveIndex(), nextUuid, setup.plugin, vmArgs, envs);
connection.sendResponse(null, nextUuid);
connection.sendObject(null, nextUuid);
connection.release();
ShutDownHook.exit(0);
} else if (object instanceof Scenario) {
Expand All @@ -73,7 +74,11 @@ private void run(int slaveIndex) throws Exception {
} else if (object instanceof Cluster) {
cluster = (Cluster) object;
} else if (object instanceof Timeline.Request) {
connection.sendResponse(state.getTimeline(), null);
connection.sendObject(state.getTimeline(), null);
} else if (object instanceof SlaveConnectionInfo.Request) {
connection.sendObject(Utils.getSlaveConnectionInfo(state.getSlaveIndex()), null);
} else if (object instanceof RemoteSlaveConnection.SlaveAddresses) {
state.setSlaveAddresses((RemoteSlaveConnection.SlaveAddresses) object);
}
}
ShutDownHook.exit(0);
Expand Down Expand Up @@ -106,7 +111,7 @@ protected Map<String, Object> getNextMasterData() throws IOException {

@Override
protected void sendResponse(DistStageAck response) throws IOException {
connection.sendResponse(response, null);
connection.sendObject(response, null);
}

@Override
Expand All @@ -120,6 +125,14 @@ protected Map<String, String> getCurrentExtras(Configuration configuration, Clus
Cluster.Group group = cluster.getGroup(state.getSlaveIndex());
extras.put(Properties.PROPERTY_GROUP_NAME, group.name);
extras.put(Properties.PROPERTY_GROUP_SIZE, String.valueOf(group.size));
for (Cluster.Group g : cluster.getGroups()) {
for(int i = 0; i != g.size; i++){
SlaveConnectionInfo ifaces = state.getSlaveAddresses(cluster, g.name, i);
for (String s: ifaces.getInterfaceNames()) {
extras.put(Properties.PROPERTY_GROUP_PREFIX + g.name + "." + i + "." + s, ifaces.getAddressesAsString(s, ","));
}
}
}
for (Cluster.Group g : cluster.getGroups()) {
extras.put(Properties.PROPERTY_GROUP_PREFIX + g.name + Properties.PROPERTY_SIZE_SUFFIX, String.valueOf(group.size));
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/radargun/config/PropertyHelper.java
Expand Up @@ -304,6 +304,7 @@ private static void setPropertyFromString(Object target, String propName, Path p
ctor.setAccessible(true);
Converter converter = ctor.newInstance();
evaluated = Evaluator.parseString(propertyString);
log.tracef("Evaluated property %s to %s", propName, evaluated);
path.set(target, converter.convert(evaluated, path.getTargetGenericType()));
} catch (InstantiationException e) {
log.errorf(e, "Cannot instantiate converter %s for setting %s (%s)",
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/radargun/state/MasterState.java
Expand Up @@ -49,4 +49,5 @@ public void removeListener(MasterListener listener) {
public List<MasterListener> getListeners() {
return Collections.unmodifiableList(listeners);
}

}
16 changes: 16 additions & 0 deletions core/src/main/java/org/radargun/state/SlaveState.java
@@ -1,13 +1,16 @@
package org.radargun.state;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import org.radargun.RemoteSlaveConnection;
import org.radargun.config.Cluster;
import org.radargun.reporting.Timeline;
import org.radargun.utils.SlaveConnectionInfo;

/**
* State residing on slave, passed to each's {@link org.radargun.DistStage#initOnSlave(SlaveState)}
Expand All @@ -18,10 +21,14 @@ public class SlaveState extends StateBase {

private InetAddress localAddress;
private int slaveIndex = -1;

private String plugin;
private String serviceName;
private Cluster.Group group;

private RemoteSlaveConnection.SlaveAddresses slaveAddresses;
private int indexInGroup;

private Map<Class<?>, Object> traits;
private Timeline timeline;
private List<ServiceListener> serviceListeners = new CopyOnWriteArrayList<ServiceListener>();
Expand Down Expand Up @@ -111,4 +118,13 @@ public void removeServiceListener(ServiceListener listener) {
public Iterable<ServiceListener> getServiceListeners() {
return Collections.unmodifiableCollection(serviceListeners);
}

public void setSlaveAddresses(RemoteSlaveConnection.SlaveAddresses slaveAddresses) {
this.slaveAddresses = slaveAddresses;
}

public SlaveConnectionInfo getSlaveAddresses(Cluster cluster, String groupName, int indexInGroup) {
int indexTotal = (Integer) new ArrayList(cluster.getSlaves(groupName)).get(indexInGroup);
return slaveAddresses.getSlaveAddresses(indexTotal);
}
}
45 changes: 45 additions & 0 deletions core/src/main/java/org/radargun/utils/SlaveConnectionInfo.java
@@ -0,0 +1,45 @@
package org.radargun.utils;

import java.io.Serializable;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Holds the list of network interfaces and their IP addresses for a single slave.
*/
public class SlaveConnectionInfo implements Serializable {
private int slaveIndex;
private Map<String, ArrayList<InetAddress>> interfaceToAddrs = new HashMap<>();

public void addAddresses(int slaveIndex, String interfaceName, ArrayList<InetAddress> addresses) {
this.slaveIndex = slaveIndex;
interfaceToAddrs.put(interfaceName, addresses);
}

public List<InetAddress> getAddresses(String interfaceName) {
return interfaceToAddrs.get(interfaceName);
}

public String getAddressesAsString(String interfaceName, String delimiter) {
return String.join(delimiter, interfaceToAddrs.get(interfaceName).stream().map(addr -> addr.getHostAddress()).collect(Collectors.toList()));
}

public Set<String> getInterfaceNames() {
return interfaceToAddrs.keySet();
}

public int getSlaveIndex() {
return slaveIndex;
}

/**
* Used as a message ID for master-slave communication.
*/
public static class Request implements Serializable {
}
}
14 changes: 14 additions & 0 deletions core/src/main/java/org/radargun/utils/Utils.java
Expand Up @@ -7,6 +7,8 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.*;
Expand Down Expand Up @@ -202,6 +204,16 @@ public static int getProcessID() {
return 0;
}

public static SlaveConnectionInfo getSlaveConnectionInfo(int slaveIndex) throws SocketException {
Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
SlaveConnectionInfo connection = new SlaveConnectionInfo();
for (NetworkInterface netint : Collections.list(nets)) {
connection.addAddresses(slaveIndex, netint.getName(), Collections.list(netint.getInetAddresses()));
Collections.list(netint.getInetAddresses()).stream().forEach(x -> System.out.println(x.getHostAddress()));
}
return connection;
}

public static class JarFilenameFilter implements FilenameFilter {
public boolean accept(File dir, String name) {
String fileName = name.toUpperCase(Locale.ENGLISH);
Expand Down Expand Up @@ -634,4 +646,6 @@ public static <T> T findThrowableCauseByClass(Throwable t, Class<T> clazz) {
return findThrowableCauseByClass(t.getCause(), clazz);
}
}


}

0 comments on commit 681fd95

Please sign in to comment.