Skip to content

Commit

Permalink
Merge pull request #253 from rabbitmq/rabbitmq-perf-test-252-sni
Browse files Browse the repository at this point in the history
Add -sni option
  • Loading branch information
acogoluegnes committed Dec 1, 2020
2 parents 774d72f + 3c2cafd commit 17f09de
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/perf/BaseMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.ArrayList;
import java.util.Collection;

import static com.rabbitmq.perf.PerfTest.strArg;
import static com.rabbitmq.perf.Utils.strArg;

/**
*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/perf/DatadogMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Map;

import static com.rabbitmq.perf.PerfTest.hasOption;
import static com.rabbitmq.perf.PerfTest.strArg;
import static com.rabbitmq.perf.Utils.strArg;
import static java.lang.Boolean.valueOf;

/**
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/com/rabbitmq/perf/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Function;

import static com.rabbitmq.perf.OptionsUtils.forEach;
import static com.rabbitmq.perf.Utils.strArg;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -204,7 +205,8 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {

factory.setTopologyRecoveryEnabled(false);

RecoveryDelayHandler recoveryDelayHandler = Utils.getRecoveryDelayHandler(strArg(cmd, "cri", null));
RecoveryDelayHandler recoveryDelayHandler = Utils.getRecoveryDelayHandler(
strArg(cmd, "cri", null));
if (recoveryDelayHandler != null) {
factory.setRecoveryDelayHandler(recoveryDelayHandler);
}
Expand Down Expand Up @@ -277,6 +279,11 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
shutdownService.wrap(() -> nioExecutor.shutdownNow());
}

factory.setSocketConfigurator(Utils.socketConfigurator(cmd));
if (factory.getNioParams() != null) {
factory.getNioParams().setSslEngineConfigurator(Utils.sslEngineConfigurator(cmd));
}

MulticastParams p = new MulticastParams();
p.setAutoAck( autoAck);
p.setAutoDelete( autoDelete);
Expand Down Expand Up @@ -650,15 +657,9 @@ public static Options getOptions() {
+ "e.g. x-priority=10"));
options.addOption(new Option("cri", "connection-recovery-interval", true, "connection recovery interval in seconds. Default is 5 seconds. "
+ "Interval syntax, e.g. 30-60, is supported to specify an random interval between 2 values between each attempt."));
return options;
}

static String strArg(CommandLineProxy cmd, char opt, String def) {
return cmd.getOptionValue(opt, def);
}

static String strArg(CommandLineProxy cmd, String opt, String def) {
return cmd.getOptionValue(opt, def);
options.addOption(new Option("sni", "server-name-indication", true, "server names for Server Name Indication TLS parameter, separated by commas"));
return options;
}

static int intArg(CommandLineProxy cmd, char opt, int def) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/perf/PrometheusMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.io.IOException;

import static com.rabbitmq.perf.PerfTest.intArg;
import static com.rabbitmq.perf.PerfTest.strArg;
import static com.rabbitmq.perf.Utils.strArg;

/**
*
Expand Down
184 changes: 132 additions & 52 deletions src/main/java/com/rabbitmq/perf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,85 +19,165 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.SocketConfigurator;
import com.rabbitmq.client.SocketConfigurators;
import com.rabbitmq.client.SslEngineConfigurator;
import com.rabbitmq.client.SslEngineConfigurators;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;

import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class Utils {

private static final ConnectionFactory CF = new ConnectionFactory();

static boolean isRecoverable(Connection connection) {
return connection instanceof AutorecoveringConnection;
}

static synchronized Address extract(String uri) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
CF.setUri(uri);
return new Address(CF.getHost(), CF.getPort());
}

/**
* @param argument
* @return
* @since 2.11.0
*/
static RecoveryDelayHandler getRecoveryDelayHandler(String argument) {
if (argument == null || argument.trim().isEmpty()) {
return null;
}
argument = argument.trim();
Pattern pattern = Pattern.compile("(\\d+)(-(\\d+))?");
Matcher matcher = pattern.matcher(argument);
if (!matcher.matches()) {
throw new IllegalArgumentException("Incorrect argument for connection recovery interval. Must be e.g. 30 or 30-60.");
}

RecoveryDelayHandler handler;
final long delay = Long.parseLong(matcher.group(1)) * 1000;
if (matcher.group(2) == null) {
handler = recoveryAttempts -> delay;
} else {
final long maxInput = Long.parseLong(matcher.group(2).replace("-", "")) * 1000;
if (maxInput <= delay) {
throw new IllegalArgumentException("Wrong interval min-max values: " + argument);
}
final long maxDelay = maxInput + 1000;
handler = recoveryAttempts -> ThreadLocalRandom.current().nextLong(delay, maxDelay);
}
return handler;
}

static final Future<?> NO_OP_FUTURE = new Future<Object>() {
static final Future<?> NO_OP_FUTURE =
new Future<Object>() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
return true;
}

@Override
public boolean isCancelled() {
return false;
return false;
}

@Override
public boolean isDone() {
return true;
return true;
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return null;
return null;
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
public Object get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
};
};
private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
private static final ConnectionFactory CF = new ConnectionFactory();

static boolean isRecoverable(Connection connection) {
return connection instanceof AutorecoveringConnection;
}

static synchronized Address extract(String uri)
throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
CF.setUri(uri);
return new Address(CF.getHost(), CF.getPort());
}

/**
* @param argument
* @return
* @since 2.11.0
*/
static RecoveryDelayHandler getRecoveryDelayHandler(String argument) {
if (argument == null || argument.trim().isEmpty()) {
return null;
}
argument = argument.trim();
Pattern pattern = Pattern.compile("(\\d+)(-(\\d+))?");
Matcher matcher = pattern.matcher(argument);
if (!matcher.matches()) {
throw new IllegalArgumentException(
"Incorrect argument for connection recovery interval. Must be e.g. 30 or 30-60.");
}

RecoveryDelayHandler handler;
final long delay = Long.parseLong(matcher.group(1)) * 1000;
if (matcher.group(2) == null) {
handler = recoveryAttempts -> delay;
} else {
final long maxInput = Long.parseLong(matcher.group(2).replace("-", "")) * 1000;
if (maxInput <= delay) {
throw new IllegalArgumentException("Wrong interval min-max values: " + argument);
}
final long maxDelay = maxInput + 1000;
handler = recoveryAttempts -> ThreadLocalRandom.current().nextLong(delay, maxDelay);
}
return handler;
}

static List<SNIServerName> sniServerNames(String argumentValue) {
if (argumentValue != null && !argumentValue.trim().isEmpty()) {
return Arrays.stream(argumentValue.split(","))
.map(s -> s.trim())
.map(s -> new SNIHostName(s))
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}

static SocketConfigurator socketConfigurator(CommandLineProxy cmd) {
List<SNIServerName> serverNames = sniServerNames(strArg(cmd, "sni", null));
if (serverNames.isEmpty()) {
return SocketConfigurators.defaultConfigurator();
} else {
SocketConfigurator socketConfigurator =
socket -> {
if (socket instanceof SSLSocket) {
SSLSocket sslSocket = (SSLSocket) socket;
SSLParameters sslParameters =
sslSocket.getSSLParameters() == null
? new SSLParameters()
: sslSocket.getSSLParameters();
sslParameters.setServerNames(serverNames);
sslSocket.setSSLParameters(sslParameters);
} else {
LOGGER.warn("SNI parameter set on a non-TLS connection");
}
};
return SocketConfigurators.defaultConfigurator().andThen(socketConfigurator);
}
}

static SslEngineConfigurator sslEngineConfigurator(CommandLineProxy cmd) {
List<SNIServerName> serverNames = sniServerNames(strArg(cmd, "sni", null));
if (serverNames.isEmpty()) {
return SslEngineConfigurators.DEFAULT;
} else {
SslEngineConfigurator sslEngineConfigurator =
sslEngine -> {
SSLParameters sslParameters =
sslEngine.getSSLParameters() == null
? new SSLParameters()
: sslEngine.getSSLParameters();
sslParameters.setServerNames(serverNames);
sslEngine.setSSLParameters(sslParameters);
};
return SslEngineConfigurators.defaultConfigurator().andThen(sslEngineConfigurator);
}
}

static String strArg(CommandLineProxy cmd, String opt, String def) {
return cmd.getOptionValue(opt, def);
}

static String strArg(CommandLineProxy cmd, char opt, String def) {
return cmd.getOptionValue(opt, def);
}
}
2 changes: 1 addition & 1 deletion src/test/java/com/rabbitmq/perf/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static CommandLineProxy cmd(String commandLine) throws ParseException {
@CsvSource({"-d testId, testId", "--id testId, testId", "'', default-test-id"})
public void strArg(String commandLine, String expectedArgumentValue) throws ParseException {
CommandLineProxy cmd = cmd(commandLine);
String value = PerfTest.strArg(cmd, 'd', "default-test-id");
String value = Utils.strArg(cmd, 'd', "default-test-id");
assertEquals(expectedArgumentValue, value);
}

Expand Down

0 comments on commit 17f09de

Please sign in to comment.