-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing the way Selector handles SSL connection handshakes #310
Changes from 2 commits
81a9256
f22676e
789341e
a95ad0a
5dbba15
1931fdf
d1532e6
f1e62ee
b996430
ebef578
ad1f146
23b0454
0b59f29
059e5ec
969f536
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,13 +38,15 @@ public class NetworkMetrics { | |
public final Histogram selectorSelectTime; | ||
public final Counter selectorIORate; | ||
public final Histogram selectorIOTime; | ||
public final Histogram selectorPerceivedSslHandshakeTime; | ||
public final Counter selectorNioCloseErrorCount; | ||
public final Counter selectorDisconnectedErrorCount; | ||
public final Counter selectorIOErrorCount; | ||
public final Counter selectorKeyOperationErrorCount; | ||
public final Counter selectorCloseKeyErrorCount; | ||
public final Counter selectorCloseSocketErrorCount; | ||
public Gauge<Long> selectorActiveConnections; | ||
public Gauge<Integer> selectorPendingHandshakes; | ||
public final Map<String, SelectorNodeMetric> selectorNodeMetricMap; | ||
|
||
// Plaintext metrics | ||
|
@@ -87,6 +89,8 @@ public NetworkMetrics(MetricRegistry registry) { | |
selectorIORate = registry.counter(MetricRegistry.name(Selector.class, "SelectorIORate")); | ||
selectorSelectTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorSelectTime")); | ||
selectorIOTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorIOTime")); | ||
selectorPerceivedSslHandshakeTime = | ||
registry.histogram(MetricRegistry.name(Selector.class, "SelectorSslHandshakeTime")); | ||
selectorNioCloseErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorNioCloseErrorCount")); | ||
selectorDisconnectedErrorCount = | ||
registry.counter(MetricRegistry.name(Selector.class, "SelectorDisconnectedErrorCount")); | ||
|
@@ -121,13 +125,25 @@ public NetworkMetrics(MetricRegistry registry) { | |
selectorNodeMetricMap = new HashMap<String, SelectorNodeMetric>(); | ||
} | ||
|
||
public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections) { | ||
/** | ||
* Initializes few network metrics for the selector | ||
* @param activeConnections count of current active connections | ||
* @param pendingSslHandshakes List of {@link SSLTransmission}s that are awaiting for handshake completion | ||
*/ | ||
public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this method is named There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
final List<SSLTransmission> pendingSslHandshakes) { | ||
selectorActiveConnections = new Gauge<Long>() { | ||
@Override | ||
public Long getValue() { | ||
return activeConnections.get(); | ||
} | ||
}; | ||
selectorPendingHandshakes = new Gauge<Integer>() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
@Override | ||
public Integer getValue() { | ||
return pendingSslHandshakes.size(); | ||
} | ||
}; | ||
} | ||
|
||
public void initializeSelectorNodeMetricIfRequired(String hostname, int port) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,6 +72,8 @@ public class Selector implements Selectable { | |
private final List<NetworkReceive> completedReceives; | ||
private final List<String> disconnected; | ||
private final List<String> connected; | ||
private final List<SSLTransmission> pendingSslHandshakes; | ||
private final Map<String, Long> sslHandshakeTimer; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not avoid the map completely? Just use a list of objects of a private class: private class PendingHandshakeTransmission {
SSLTransmission transmission;
long pendingSinceMs;
} |
||
private final Time time; | ||
private final NetworkMetrics metrics; | ||
private final AtomicLong IdGenerator; | ||
|
@@ -90,10 +92,12 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) | |
this.completedReceives = new ArrayList<NetworkReceive>(); | ||
this.connected = new ArrayList<String>(); | ||
this.disconnected = new ArrayList<String>(); | ||
this.pendingSslHandshakes = new ArrayList<>(); | ||
this.sslHandshakeTimer = new HashMap<String, Long>(); | ||
this.metrics = metrics; | ||
this.IdGenerator = new AtomicLong(0); | ||
this.activeConnections = new AtomicLong(0); | ||
this.metrics.initializeSelectorMetricsIfRequired(activeConnections); | ||
this.metrics.initializeSelectorMetricsIfRequired(activeConnections, pendingSslHandshakes); | ||
this.sslFactory = sslFactory; | ||
} | ||
|
||
|
@@ -315,7 +319,14 @@ public void poll(long timeoutMs, List<NetworkSend> sends) | |
Transmission transmission = getTransmission(key); | ||
try { | ||
if (key.isConnectable()) { | ||
handleConnect(key, transmission); | ||
transmission.finishConnect(); | ||
if (transmission.ready()) { | ||
connected.add(transmission.getConnectionId()); | ||
metrics.selectorConnectionCreated.inc(); | ||
} else { | ||
pendingSslHandshakes.add((SSLTransmission) transmission); | ||
sslHandshakeTimer.put(transmission.getConnectionId(), System.currentTimeMillis()); | ||
} | ||
} | ||
|
||
/* if channel is not ready, finish prepare */ | ||
|
@@ -347,12 +358,32 @@ public void poll(long timeoutMs, List<NetworkSend> sends) | |
close(key); | ||
} | ||
} | ||
completeSslHandshakes(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this call should really happen before line 321, shouldn't it? why bother checking those connections that were just added to the list? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me try to explain why I have it here. The ssl handshake would complete in either write() read() call for keys returned via nioSelector.selectedKeys(). Which means, at the end of the while loop (313 - 358), we will have some keys which have completed their handshakes. If we wait until next poll() call which the caller has to make, we add some more additional latency to it and also, we add one more additional poll() call since we haven't added these connections which have completed handshakes to the connected list. Hence the network client still thinks that the connection is not ready yet until the next poll(). For these reasons, I want to have it here instead of above the while loop. |
||
this.metrics.selectorIORate.inc(); | ||
} | ||
long endIo = time.milliseconds(); | ||
this.metrics.selectorIOTime.update(endIo - endSelect); | ||
} | ||
|
||
/** | ||
* Add those Ssl connections to connected list on handshake completion | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think doc can be improved. What are "those"? |
||
*/ | ||
private void completeSslHandshakes() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method does not do what its name implies, could you rename it to |
||
Iterator<SSLTransmission> sslTransIter = pendingSslHandshakes.iterator(); | ||
while (sslTransIter.hasNext()) { | ||
Transmission sslTransmission = sslTransIter.next(); | ||
if (sslTransmission != null && sslTransmission.ready()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the null check is unnecessary and misleading. |
||
connected.add(sslTransmission.getConnectionId()); | ||
metrics.selectorConnectionCreated.inc(); | ||
Long handshakeStartTime = sslHandshakeTimer.remove(sslTransmission.getConnectionId()); | ||
if (handshakeStartTime != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, this is unnecessary, but I think you should simply use the other suggestion of using a single data structure. |
||
metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - handshakeStartTime.longValue()); | ||
} | ||
sslTransIter.remove(); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Generate the description for a SocketChannel | ||
*/ | ||
|
@@ -455,6 +486,10 @@ private void close(SelectionKey key) { | |
activeConnections.set(this.keyMap.size()); | ||
try { | ||
transmission.close(); | ||
if (pendingSslHandshakes.contains(transmission.getConnectionId())) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
pendingSslHandshakes.remove(transmission.getConnectionId()); | ||
sslHandshakeTimer.remove(transmission.getConnectionId()); | ||
} | ||
} catch (IOException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, looks like we might have an issue. even keyMap and activeConnections looks to be affected. Lets discuss in person. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the documentation it looks like the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I cannot think of why we would want to use this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. selector should never be used across threads There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, my bad. I thought the processNewResponses() in SocketServer() was a daemon thread. Hence, the close(SelectionKey) could be called simultaneously when selector poll() is happening. Looks like its called sequentially and selector.poll() follows it. So, I don't think we have a problem of multiple threads accessing it. |
||
logger.error("IOException thrown during closing of transmission with connectionId {} :", | ||
transmission.getConnectionId(), e); | ||
|
@@ -483,16 +518,6 @@ private SelectionKey keyForId(String id) { | |
return this.keyMap.get(id); | ||
} | ||
|
||
/** | ||
* Process connections that have finished their handshake | ||
*/ | ||
private void handleConnect(SelectionKey key, Transmission transmission) | ||
throws IOException { | ||
transmission.finishConnect(); | ||
this.connected.add(transmission.getConnectionId()); | ||
this.metrics.selectorConnectionCreated.inc(); | ||
} | ||
|
||
/** | ||
* Process reads from ready sockets | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Random; | ||
import junit.framework.Assert; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this for? Don't we usually use |
||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
@@ -49,9 +50,7 @@ public void setup() | |
SSLFactory clientSSLFactory = new SSLFactory(clientSSLConfig); | ||
this.server = new EchoServer(serverSSLFactory, 18383); | ||
this.server.start(); | ||
this.selector = | ||
new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), | ||
clientSSLFactory); | ||
this.selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), clientSSLFactory); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, why the |
||
} | ||
|
||
@After | ||
|
@@ -203,6 +202,19 @@ public void testEmptyRequest() | |
assertEquals("", blockingRequest(connectionId, "")); | ||
} | ||
|
||
@Test | ||
public void testSSLConnect() | ||
throws IOException { | ||
String connectionId = | ||
selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL); | ||
Assert.assertFalse("Channel should not be ready by now (until handshake completes)", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that a handshake completes very fast and becomes ready before the connectionId is added to connected? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is not possible as handshake involves 6 interactions (read and write) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But is this all async? How can we rely on the fact that it won't complete? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this assumption might create a race condition. Also, is it really relevant to us that the channel is "not ready"? I think this test can be skipped. All you need to check is that it is ready when it is added to connected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this test does not seem relevant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this check I meant. the rest of it (line 212 and after) is a valid check. |
||
selector.isChannelReady(connectionId)); | ||
while (!selector.connected().contains(connectionId)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you have resolved Gopal's comment. Could you talk to him in person if you are not convinced about his suggestion? |
||
selector.poll(10000L); | ||
} | ||
Assert.assertTrue("Channel should have been ready by now ", selector.isChannelReady(connectionId)); | ||
} | ||
|
||
private String blockingRequest(String connectionId, String s) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to add a test case for handshake failure and cleanup part? Basically to cover all new code. |
||
throws Exception { | ||
selector.poll(1000L, asList(SelectorTest.createSend(connectionId, s))); | ||
|
@@ -224,10 +236,6 @@ private String blockingSSLConnect() | |
while (!selector.connected().contains(connectionId)) { | ||
selector.poll(10000L); | ||
} | ||
//finish the handshake as well | ||
while (!selector.isChannelReady(connectionId)) { | ||
selector.poll(10000L); | ||
} | ||
return connectionId; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"a few"