Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed May 16, 2016
1 parent 81a9256 commit f22676e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ public NetworkMetrics(MetricRegistry registry) {
/**
* Initializes few network metrics for the selector
* @param activeConnections count of current active connections
* @param pendingSslHandshakes List of {@link Transmission}s that are awaiting for handshake completion
* @param pendingSslHandshakes List of {@link SSLTransmission}s that are awaiting for handshake completion
*/
public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections,
final List<Transmission> pendingSslHandshakes) {
final List<SSLTransmission> pendingSslHandshakes) {
selectorActiveConnections = new Gauge<Long>() {
@Override
public Long getValue() {
Expand Down
60 changes: 29 additions & 31 deletions ambry-network/src/main/java/com.github.ambry.network/Selector.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class Selector implements Selectable {
private final List<NetworkReceive> completedReceives;
private final List<String> disconnected;
private final List<String> connected;
private final List<Transmission> pendingSslHandshakes;
private final List<SSLTransmission> pendingSslHandshakes;
private final Map<String, Long> sslHandshakeTimer;
private final Time time;
private final NetworkMetrics metrics;
Expand Down Expand Up @@ -319,12 +319,12 @@ public void poll(long timeoutMs, List<NetworkSend> sends)
Transmission transmission = getTransmission(key);
try {
if (key.isConnectable()) {
handleConnect(key, transmission);
transmission.finishConnect();
if (transmission.ready()) {
this.connected.add(transmission.getConnectionId());
this.metrics.selectorConnectionCreated.inc();
connected.add(transmission.getConnectionId());
metrics.selectorConnectionCreated.inc();
} else {
pendingSslHandshakes.add(transmission);
pendingSslHandshakes.add((SSLTransmission) transmission);
sslHandshakeTimer.put(transmission.getConnectionId(), System.currentTimeMillis());
}
}
Expand Down Expand Up @@ -358,30 +358,32 @@ public void poll(long timeoutMs, List<NetworkSend> sends)
close(key);
}
}

Iterator<Transmission> sslTransIter = pendingSslHandshakes.iterator();
while (sslTransIter.hasNext()) {
Transmission sslTransmission = sslTransIter.next();
if (sslTransmission != null && sslTransmission.ready()) {
connected.add(sslTransmission.getConnectionId());
this.metrics.selectorConnectionCreated.inc();
Long handshakeStartTime = sslHandshakeTimer.remove(sslTransmission.getConnectionId());
if (handshakeStartTime != null) {
this.metrics.selectorPerceivedSslHandshakeTime
.update(System.currentTimeMillis() - handshakeStartTime.longValue());
}
sslTransIter.remove();
} else if (sslTransmission == null || !keyMap.containsKey(sslTransmission.getConnectionId())) {
// transmission was closed for some reason
sslTransIter.remove();
}
}
completeSslHandshakes();
this.metrics.selectorIORate.inc();
}
long endIo = time.milliseconds();
this.metrics.selectorIOTime.update(endIo - endSelect);
}

/**
* Add those Ssl connections to connected list on handshake completion
*/
private void completeSslHandshakes() {
Iterator<SSLTransmission> sslTransIter = pendingSslHandshakes.iterator();
while (sslTransIter.hasNext()) {
Transmission sslTransmission = sslTransIter.next();
if (sslTransmission != null && sslTransmission.ready()) {
connected.add(sslTransmission.getConnectionId());
metrics.selectorConnectionCreated.inc();
Long handshakeStartTime = sslHandshakeTimer.remove(sslTransmission.getConnectionId());
if (handshakeStartTime != null) {
metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - handshakeStartTime.longValue());
}
sslTransIter.remove();
}
}
}

/**
* Generate the description for a SocketChannel
*/
Expand Down Expand Up @@ -484,6 +486,10 @@ private void close(SelectionKey key) {
activeConnections.set(this.keyMap.size());
try {
transmission.close();
if (pendingSslHandshakes.contains(transmission.getConnectionId())) {
pendingSslHandshakes.remove(transmission.getConnectionId());
sslHandshakeTimer.remove(transmission.getConnectionId());
}
} catch (IOException e) {
logger.error("IOException thrown during closing of transmission with connectionId {} :",
transmission.getConnectionId(), e);
Expand Down Expand Up @@ -512,14 +518,6 @@ private SelectionKey keyForId(String id) {
return this.keyMap.get(id);
}

/**
* Process connections that have finished their handshake
*/
private void handleConnect(SelectionKey key, Transmission transmission)
throws IOException {
transmission.finishConnect();
}

/**
* Process reads from ready sockets
*/
Expand Down

0 comments on commit f22676e

Please sign in to comment.