Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the way Selector handles SSL connection handshakes #310

Merged
merged 15 commits into from
May 24, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class NetworkMetrics {
public final Counter selectorKeyOperationErrorCount;
public final Counter selectorCloseKeyErrorCount;
public final Counter selectorCloseSocketErrorCount;
public Gauge<Long> selectorActiveConnections;
public Gauge<Long> numActiveConnections;
public final Map<String, SelectorNodeMetric> selectorNodeMetricMap;

// Plaintext metrics
Expand Down Expand Up @@ -121,8 +121,12 @@ public NetworkMetrics(MetricRegistry registry) {
selectorNodeMetricMap = new HashMap<String, SelectorNodeMetric>();
}

public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections) {
selectorActiveConnections = new Gauge<Long>() {
/**
* Initializes a few network metrics for the selector
* @param activeConnections count of current active connections
*/
public void initializeSelectorMetrics(final AtomicLong activeConnections) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be public?

numActiveConnections = new Gauge<Long>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked at the rest, but this Gauge is never registered as far as I can see.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your change though, but we should get this fixed.

@Override
public Long getValue() {
return activeConnections.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* Transmission used to speak plain text to the underlying channel.
*/
public class PlainTextTransmission extends Transmission {
private static final Logger logger = LoggerFactory.getLogger(SSLTransmission.class);
private static final Logger logger = LoggerFactory.getLogger(PlainTextTransmission.class);

public PlainTextTransmission(String connectionId, SocketChannel socketChannel, SelectionKey key, Time time,
NetworkMetrics metrics) {
Expand Down
56 changes: 36 additions & 20 deletions ambry-network/src/main/java/com.github.ambry.network/Selector.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -72,10 +73,11 @@ public class Selector implements Selectable {
private final List<NetworkReceive> completedReceives;
private final List<String> disconnected;
private final List<String> connected;
private final Set<String> unreadyConnections;
private final Time time;
private final NetworkMetrics metrics;
private final AtomicLong IdGenerator;
private AtomicLong activeConnections;
private AtomicLong numActiveConnections;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

private final SSLFactory sslFactory;

/**
Expand All @@ -92,8 +94,9 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory)
this.disconnected = new ArrayList<String>();
this.metrics = metrics;
this.IdGenerator = new AtomicLong(0);
this.activeConnections = new AtomicLong(0);
this.metrics.initializeSelectorMetricsIfRequired(activeConnections);
numActiveConnections = new AtomicLong(0);
unreadyConnections = new HashSet<>();
metrics.initializeSelectorMetrics(numActiveConnections);
this.sslFactory = sslFactory;
}

Expand Down Expand Up @@ -163,7 +166,7 @@ public String connect(InetSocketAddress address, int sendBufferSize, int receive
}
key.attach(transmission);
this.keyMap.put(connectionId, key);
activeConnections.set(this.keyMap.size());
numActiveConnections.set(this.keyMap.size());
return connectionId;
}

Expand All @@ -190,7 +193,7 @@ public String register(SocketChannel channel, PortType portType)
}
key.attach(transmission);
this.keyMap.put(connectionId, key);
activeConnections.set(this.keyMap.size());
numActiveConnections.set(this.keyMap.size());
return connectionId;
}

Expand Down Expand Up @@ -315,7 +318,13 @@ public void poll(long timeoutMs, List<NetworkSend> sends)
Transmission transmission = getTransmission(key);
try {
if (key.isConnectable()) {
handleConnect(key, transmission);
transmission.finishConnect();
if (transmission.ready()) {
connected.add(transmission.getConnectionId());
metrics.selectorConnectionCreated.inc();
} else {
unreadyConnections.add(transmission.getConnectionId());
}
}

/* if channel is not ready, finish prepare */
Expand Down Expand Up @@ -347,12 +356,28 @@ public void poll(long timeoutMs, List<NetworkSend> sends)
close(key);
}
}
checkUnreadyConnectionsStatus();
this.metrics.selectorIORate.inc();
}
long endIo = time.milliseconds();
this.metrics.selectorIOTime.update(endIo - endSelect);
}

/**
* Check readiness for unready connections and add to completed list if ready
*/
private void checkUnreadyConnectionsStatus() {
Iterator<String> iterator = unreadyConnections.iterator();
while (iterator.hasNext()) {
String connId = iterator.next();
if (isChannelReady(connId)) {
connected.add(connId);
iterator.remove();
metrics.selectorConnectionCreated.inc();
}
}
}

/**
* Generate the description for a SocketChannel
*/
Expand All @@ -368,7 +393,7 @@ private String socketDescription(SocketChannel channel) {
}

/**
* Returns true if channel is ready after completing handshake to accept reads/writes
* Returns {@code true} if channel is ready to send or receive data, {@code false} otherwise
* @param connectionId upon which readiness is checked for
* @return true if channel is ready to accept reads/writes, false otherwise
*/
Expand Down Expand Up @@ -397,8 +422,8 @@ public List<String> connected() {
return this.connected;
}

public long getActiveConnections() {
return activeConnections.get();
public long getNumActiveConnections() {
return numActiveConnections.get();
}

/**
Expand Down Expand Up @@ -452,7 +477,8 @@ private void close(SelectionKey key) {
logger.debug("Closing connection from {}", transmission.getConnectionId());
this.disconnected.add(transmission.getConnectionId());
this.keyMap.remove(transmission.getConnectionId());
activeConnections.set(this.keyMap.size());
numActiveConnections.set(keyMap.size());
unreadyConnections.remove(transmission.getConnectionId());
try {
transmission.close();
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. If a key can be closed in a different thread, would this part be thread safe?
  2. Not related to this PR.
    Transmission.close() will NOT throw an IOException if taking a look at its implementation in both PlainTextTransmission and SSLTransmission though the abstract method defines so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, looks like we might have an issue. even keyMap and activeConnections looks to be affected. Lets discuss in person.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the documentation it looks like the Selector isn't meant to be thread safe. The documentation of the class on line 64 says so. There are a lot of parts in this class that work on the fact that this class was not meant to be thread safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I cannot think of why we would want to use this Selector across threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selector should never be used across threads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my bad. I thought the processNewResponses() in SocketServer() was a daemon thread. Hence, the close(SelectionKey) could be called simultaneously when selector poll() is happening. Looks like its called sequentially and selector.poll() follows it. So, I don't think we have a problem of multiple threads accessing it.

Expand Down Expand Up @@ -483,16 +509,6 @@ private SelectionKey keyForId(String id) {
return this.keyMap.get(id);
}

/**
* Process connections that have finished their handshake
*/
private void handleConnect(SelectionKey key, Transmission transmission)
throws IOException {
transmission.finishConnect();
this.connected.add(transmission.getConnectionId());
this.metrics.selectorConnectionCreated.inc();
}

/**
* Process reads from ready sockets
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Random;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -47,18 +48,16 @@ public void setup()
TestSSLUtils.createSSLConfig("DC1,DC2,DC3", SSLFactory.Mode.CLIENT, trustStoreFile, "client");
SSLFactory serverSSLFactory = new SSLFactory(sslConfig);
SSLFactory clientSSLFactory = new SSLFactory(clientSSLConfig);
this.server = new EchoServer(serverSSLFactory, 18383);
this.server.start();
this.selector =
new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(),
clientSSLFactory);
server = new EchoServer(serverSSLFactory, 18383);
server.start();
selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), clientSSLFactory);
}

@After
public void teardown()
throws Exception {
this.selector.close();
this.server.close();
selector.close();
server.close();
}

/**
Expand All @@ -72,7 +71,7 @@ public void testServerDisconnect()
assertEquals("hello", blockingRequest(connectionId, "hello"));

// disconnect
this.server.closeConnections();
server.closeConnections();
while (!selector.disconnected().contains(connectionId)) {
selector.poll(1000L);
}
Expand Down Expand Up @@ -203,6 +202,27 @@ public void testEmptyRequest()
assertEquals("", blockingRequest(connectionId, ""));
}

@Test
public void testSSLConnect()
throws IOException {
String connectionId =
selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL);
while (!selector.connected().contains(connectionId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you have resolved Gopal's comment. Could you talk to him in person if you are not convinced about his suggestion?

selector.poll(10000L);
}
Assert.assertTrue("Channel should have been ready by now ", selector.isChannelReady(connectionId));
}

@Test
public void testCloseAfterConnectCall()
throws IOException {
String connectionId =
selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL);
selector.close(connectionId);
Assert.assertTrue("Channel should have been added to disconnected list",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the same holds for disconnected() or any of those methods that can be populated only after a poll() if I am not wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you have resolved these comments either. Please follow up with Gopal or me about this if you are not convinced.

selector.disconnected().contains(connectionId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better also check pendingHandshakeTransmissions does not contain the connectionId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a private member and internal to Selector

}

private String blockingRequest(String connectionId, String s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test case for handshake failure and cleanup part? Basically to cover all new code.

throws Exception {
selector.poll(1000L, asList(SelectorTest.createSend(connectionId, s)));
Expand All @@ -224,10 +244,6 @@ private String blockingSSLConnect()
while (!selector.connected().contains(connectionId)) {
selector.poll(10000L);
}
//finish the handshake as well
while (!selector.isChannelReady(connectionId)) {
selector.poll(10000L);
}
return connectionId;
}
}