Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 3 additions & 52 deletions driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -47,10 +46,8 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -908,7 +905,6 @@ public static class Builder {

public static final String RANDOM_PORT = "__RANDOM_PORT__";
private static final Pattern RANDOM_PORT_PATTERN = Pattern.compile(RANDOM_PORT);
private static final int RANDOMIZE_PORT_MAX_RETRIES = 20;

private String ipPrefix = TestUtils.IP_PREFIX;
int[] nodes = {1};
Expand All @@ -921,7 +917,6 @@ public static class Builder {
private final Map<String, Object> cassandraConfiguration = Maps.newLinkedHashMap();
private final Map<String, Object> dseConfiguration = Maps.newLinkedHashMap();
private final Map<Integer, Workload[]> workloads = new HashMap<Integer, Workload[]>();
private final HashSet<Integer> selectedPorts = new HashSet<Integer>();

private Builder() {
cassandraConfiguration.put("start_rpc", false);
Expand Down Expand Up @@ -1074,14 +1069,6 @@ public CCMBridge build() {
cassandraVersion = this.version;
}

selectedPorts.clear();
// Preemptively banning every number that looks like explicitly provided port
selectedPorts.addAll(collectPorts(this.cassandraConfiguration));
if (dseVersion != null) {
selectedPorts.addAll(collectPorts(this.dseConfiguration));
}
selectedPorts.addAll(Ints.asList(jmxPorts));

Map<String, Object> cassandraConfiguration = randomizePorts(this.cassandraConfiguration);
int storagePort = Integer.parseInt(cassandraConfiguration.get("storage_port").toString());
int thriftPort = Integer.parseInt(cassandraConfiguration.get("rpc_port").toString());
Expand All @@ -1097,7 +1084,7 @@ public CCMBridge build() {
int[] generatedJmxPorts = new int[numNodes];
for (int i = 0; i < numNodes; i++) {
if (i >= jmxPorts.length) {
generatedJmxPorts[i] = selectAvailablePort();
generatedJmxPorts[i] = TestUtils.findAvailablePort();
} else {
generatedJmxPorts[i] = jmxPorts[i];
}
Expand Down Expand Up @@ -1255,7 +1242,7 @@ private void updateNodeConf(CCMBridge ccm) {
int nodesInDc = nodes[dc - 1];
for (int i = 0; i < nodesInDc; i++) {
int jmxPort = ccm.jmxAddressOfNode(n).getPort();
int debugPort = selectAvailablePort();
int debugPort = TestUtils.findAvailablePort();
logger.trace(
"Node {} in cluster {} using JMX port {} and debug port {}",
n,
Expand Down Expand Up @@ -1320,48 +1307,12 @@ private String randomizePorts(CharSequence str) {
Matcher matcher = RANDOM_PORT_PATTERN.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, Integer.toString(selectAvailablePort()));
matcher.appendReplacement(sb, Integer.toString(TestUtils.findAvailablePort()));
}
matcher.appendTail(sb);
return sb.toString();
}

// Collects port numbers from configuration map. May catch some non-issue false positives.
private static Collection<Integer> collectPorts(Map<String, Object> config) {
HashSet<Integer> set = new HashSet<Integer>();
for (Object value : config.values()) {
try {
int result = Integer.parseInt(value.toString());
if ((1 <= result) && (result <= 65535)) {
set.add(result);
}
} catch (NumberFormatException e) {
}
}
return set;
}

private int findUnselectedPort() {
int port, cnt = 0;
do {
cnt++;
port = TestUtils.findAvailablePort();
} while (selectedPorts.contains(port) && (cnt < RANDOMIZE_PORT_MAX_RETRIES));

if (cnt >= RANDOMIZE_PORT_MAX_RETRIES) {
throw new RuntimeException(
"Couldn't assign random ports. "
+ "This may happen when you're trying to exhaust nearly all available ports.");
}
return port;
}

private int selectAvailablePort() {
int port = findUnselectedPort();
selectedPorts.add(port);
return port;
}

@Override
@SuppressWarnings("SimplifiableIfStatement")
public boolean equals(Object o) {
Expand Down
25 changes: 21 additions & 4 deletions driver-core/src/test/java/com/datastax/driver/core/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
/** A number of static fields/methods handy for tests. */
public abstract class TestUtils {

private static final Map<Integer, Long> recentPorts = new HashMap<Integer, Long>();
private static final long RECENT_PORT_TTL = 4 * 60 * (long) (1e9); // nanoseconds
private static final int MAX_FIND_PORT_RETRIES = 20;
public static final String IP_PREFIX;

static {
Expand Down Expand Up @@ -760,10 +763,22 @@ public static String generateIdentifier(String prefix) {
public static synchronized int findAvailablePort() throws RuntimeException {
ServerSocket ss = null;
try {
// let the system pick an ephemeral port
ss = new ServerSocket(0);
ss.setReuseAddress(true);
return ss.getLocalPort();
int retries = 0;
while (retries++ < MAX_FIND_PORT_RETRIES) {
// let the system pick an ephemeral port
ss = new ServerSocket(0);
ss.setReuseAddress(true);
long time = System.nanoTime();
int port = ss.getLocalPort();
Long last = recentPorts.get(port);
if (last == null || time - last > RECENT_PORT_TTL) {
recentPorts.put(port, time);
logger.info("Found available port: {}", port);
return port;
} else {
ss.close();
}
}
} catch (IOException e) {
throw Throwables.propagate(e);
} finally {
Expand All @@ -775,6 +790,8 @@ public static synchronized int findAvailablePort() throws RuntimeException {
}
}
}
throw new RuntimeException(
"Couldn't find available port. Max retries (" + MAX_FIND_PORT_RETRIES + ") exceeded.");
}

private static final Predicate<InetSocketAddress> PORT_IS_UP =
Expand Down