From 81a9256e2b9eee7dcfd21bad9d909377f2f70f2c Mon Sep 17 00:00:00 2001 From: sinaraya Date: Fri, 13 May 2016 10:37:33 -0700 Subject: [PATCH 01/15] Fixing Selector connected list for SSL connections --- .../NetworkMetrics.java | 18 +++++++++- .../com.github.ambry.network/Selector.java | 33 +++++++++++++++++-- .../SSLSelectorTest.java | 22 +++++++++---- 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 3058c36e55..19e1b69ea1 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -38,6 +38,7 @@ public class NetworkMetrics { public final Histogram selectorSelectTime; public final Counter selectorIORate; public final Histogram selectorIOTime; + public final Histogram selectorPerceivedSslHandshakeTime; public final Counter selectorNioCloseErrorCount; public final Counter selectorDisconnectedErrorCount; public final Counter selectorIOErrorCount; @@ -45,6 +46,7 @@ public class NetworkMetrics { public final Counter selectorCloseKeyErrorCount; public final Counter selectorCloseSocketErrorCount; public Gauge selectorActiveConnections; + public Gauge selectorPendingHandshakes; public final Map selectorNodeMetricMap; // Plaintext metrics @@ -87,6 +89,8 @@ public NetworkMetrics(MetricRegistry registry) { selectorIORate = registry.counter(MetricRegistry.name(Selector.class, "SelectorIORate")); selectorSelectTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorSelectTime")); selectorIOTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorIOTime")); + selectorPerceivedSslHandshakeTime = + registry.histogram(MetricRegistry.name(Selector.class, "SelectorSslHandshakeTime")); selectorNioCloseErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorNioCloseErrorCount")); selectorDisconnectedErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorDisconnectedErrorCount")); @@ -121,13 +125,25 @@ public NetworkMetrics(MetricRegistry registry) { selectorNodeMetricMap = new HashMap(); } - public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections) { + /** + * Initializes few network metrics for the selector + * @param activeConnections count of current active connections + * @param pendingSslHandshakes List of {@link Transmission}s that are awaiting for handshake completion + */ + public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, + final List pendingSslHandshakes) { selectorActiveConnections = new Gauge() { @Override public Long getValue() { return activeConnections.get(); } }; + selectorPendingHandshakes = new Gauge() { + @Override + public Integer getValue() { + return pendingSslHandshakes.size(); + } + }; } public void initializeSelectorNodeMetricIfRequired(String hostname, int port) { diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index d4750c1160..651e1825d9 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,6 +72,8 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; + private final List pendingSslHandshakes; + private final Map sslHandshakeTimer; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -90,10 +92,12 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); + this.pendingSslHandshakes = new ArrayList<>(); + this.sslHandshakeTimer = new HashMap(); this.metrics = metrics; this.IdGenerator = new AtomicLong(0); this.activeConnections = new AtomicLong(0); - this.metrics.initializeSelectorMetricsIfRequired(activeConnections); + this.metrics.initializeSelectorMetricsIfRequired(activeConnections, pendingSslHandshakes); this.sslFactory = sslFactory; } @@ -316,6 +320,13 @@ public void poll(long timeoutMs, List sends) try { if (key.isConnectable()) { handleConnect(key, transmission); + if (transmission.ready()) { + this.connected.add(transmission.getConnectionId()); + this.metrics.selectorConnectionCreated.inc(); + } else { + pendingSslHandshakes.add(transmission); + sslHandshakeTimer.put(transmission.getConnectionId(), System.currentTimeMillis()); + } } /* if channel is not ready, finish prepare */ @@ -347,6 +358,24 @@ public void poll(long timeoutMs, List sends) close(key); } } + + Iterator sslTransIter = pendingSslHandshakes.iterator(); + while (sslTransIter.hasNext()) { + Transmission sslTransmission = sslTransIter.next(); + if (sslTransmission != null && sslTransmission.ready()) { + connected.add(sslTransmission.getConnectionId()); + this.metrics.selectorConnectionCreated.inc(); + Long handshakeStartTime = sslHandshakeTimer.remove(sslTransmission.getConnectionId()); + if (handshakeStartTime != null) { + this.metrics.selectorPerceivedSslHandshakeTime + .update(System.currentTimeMillis() - handshakeStartTime.longValue()); + } + sslTransIter.remove(); + } else if (sslTransmission == null || !keyMap.containsKey(sslTransmission.getConnectionId())) { + // transmission was closed for some reason + sslTransIter.remove(); + } + } this.metrics.selectorIORate.inc(); } long endIo = time.milliseconds(); @@ -489,8 +518,6 @@ private SelectionKey keyForId(String id) { private void handleConnect(SelectionKey key, Transmission transmission) throws IOException { transmission.finishConnect(); - this.connected.add(transmission.getConnectionId()); - this.metrics.selectorConnectionCreated.inc(); } /** diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index af0e89c2a8..c016b787b9 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import junit.framework.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,9 +50,7 @@ public void setup() SSLFactory clientSSLFactory = new SSLFactory(clientSSLConfig); this.server = new EchoServer(serverSSLFactory, 18383); this.server.start(); - this.selector = - new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), - clientSSLFactory); + this.selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), clientSSLFactory); } @After @@ -203,6 +202,19 @@ public void testEmptyRequest() assertEquals("", blockingRequest(connectionId, "")); } + @Test + public void testSSLConnect() + throws IOException { + String connectionId = + selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL); + Assert.assertFalse("Channel should not be ready by now (until handshake completes)", + selector.isChannelReady(connectionId)); + while (!selector.connected().contains(connectionId)) { + selector.poll(10000L); + } + Assert.assertTrue("Channel should have been ready by now ", selector.isChannelReady(connectionId)); + } + private String blockingRequest(String connectionId, String s) throws Exception { selector.poll(1000L, asList(SelectorTest.createSend(connectionId, s))); @@ -224,10 +236,6 @@ private String blockingSSLConnect() while (!selector.connected().contains(connectionId)) { selector.poll(10000L); } - //finish the handshake as well - while (!selector.isChannelReady(connectionId)) { - selector.poll(10000L); - } return connectionId; } } From f22676e10ce478c98454aedd63c5576b2943570e Mon Sep 17 00:00:00 2001 From: sinaraya Date: Mon, 16 May 2016 14:29:26 -0700 Subject: [PATCH 02/15] Addressing comments --- .../NetworkMetrics.java | 4 +- .../com.github.ambry.network/Selector.java | 60 +++++++++---------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 19e1b69ea1..9203031d69 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -128,10 +128,10 @@ public NetworkMetrics(MetricRegistry registry) { /** * Initializes few network metrics for the selector * @param activeConnections count of current active connections - * @param pendingSslHandshakes List of {@link Transmission}s that are awaiting for handshake completion + * @param pendingSslHandshakes List of {@link SSLTransmission}s that are awaiting for handshake completion */ public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, - final List pendingSslHandshakes) { + final List pendingSslHandshakes) { selectorActiveConnections = new Gauge() { @Override public Long getValue() { diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 651e1825d9..042b524155 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,7 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final List pendingSslHandshakes; + private final List pendingSslHandshakes; private final Map sslHandshakeTimer; private final Time time; private final NetworkMetrics metrics; @@ -319,12 +319,12 @@ public void poll(long timeoutMs, List sends) Transmission transmission = getTransmission(key); try { if (key.isConnectable()) { - handleConnect(key, transmission); + transmission.finishConnect(); if (transmission.ready()) { - this.connected.add(transmission.getConnectionId()); - this.metrics.selectorConnectionCreated.inc(); + connected.add(transmission.getConnectionId()); + metrics.selectorConnectionCreated.inc(); } else { - pendingSslHandshakes.add(transmission); + pendingSslHandshakes.add((SSLTransmission) transmission); sslHandshakeTimer.put(transmission.getConnectionId(), System.currentTimeMillis()); } } @@ -358,30 +358,32 @@ public void poll(long timeoutMs, List sends) close(key); } } - - Iterator sslTransIter = pendingSslHandshakes.iterator(); - while (sslTransIter.hasNext()) { - Transmission sslTransmission = sslTransIter.next(); - if (sslTransmission != null && sslTransmission.ready()) { - connected.add(sslTransmission.getConnectionId()); - this.metrics.selectorConnectionCreated.inc(); - Long handshakeStartTime = sslHandshakeTimer.remove(sslTransmission.getConnectionId()); - if (handshakeStartTime != null) { - this.metrics.selectorPerceivedSslHandshakeTime - .update(System.currentTimeMillis() - handshakeStartTime.longValue()); - } - sslTransIter.remove(); - } else if (sslTransmission == null || !keyMap.containsKey(sslTransmission.getConnectionId())) { - // transmission was closed for some reason - sslTransIter.remove(); - } - } + completeSslHandshakes(); this.metrics.selectorIORate.inc(); } long endIo = time.milliseconds(); this.metrics.selectorIOTime.update(endIo - endSelect); } + /** + * Add those Ssl connections to connected list on handshake completion + */ + private void completeSslHandshakes() { + Iterator sslTransIter = pendingSslHandshakes.iterator(); + while (sslTransIter.hasNext()) { + Transmission sslTransmission = sslTransIter.next(); + if (sslTransmission != null && sslTransmission.ready()) { + connected.add(sslTransmission.getConnectionId()); + metrics.selectorConnectionCreated.inc(); + Long handshakeStartTime = sslHandshakeTimer.remove(sslTransmission.getConnectionId()); + if (handshakeStartTime != null) { + metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - handshakeStartTime.longValue()); + } + sslTransIter.remove(); + } + } + } + /** * Generate the description for a SocketChannel */ @@ -484,6 +486,10 @@ private void close(SelectionKey key) { activeConnections.set(this.keyMap.size()); try { transmission.close(); + if (pendingSslHandshakes.contains(transmission.getConnectionId())) { + pendingSslHandshakes.remove(transmission.getConnectionId()); + sslHandshakeTimer.remove(transmission.getConnectionId()); + } } catch (IOException e) { logger.error("IOException thrown during closing of transmission with connectionId {} :", transmission.getConnectionId(), e); @@ -512,14 +518,6 @@ private SelectionKey keyForId(String id) { return this.keyMap.get(id); } - /** - * Process connections that have finished their handshake - */ - private void handleConnect(SelectionKey key, Transmission transmission) - throws IOException { - transmission.finishConnect(); - } - /** * Process reads from ready sockets */ From 789341eb616259a71f1cfa7256ed6e92ca6b036d Mon Sep 17 00:00:00 2001 From: sinaraya Date: Mon, 16 May 2016 17:30:40 -0700 Subject: [PATCH 03/15] Minor fixes --- .../NetworkMetrics.java | 6 +-- .../com.github.ambry.network/Selector.java | 44 ++++++++++--------- .../SSLSelectorTest.java | 17 +++++++ 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 9203031d69..aa90d376f0 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -128,10 +128,10 @@ public NetworkMetrics(MetricRegistry registry) { /** * Initializes few network metrics for the selector * @param activeConnections count of current active connections - * @param pendingSslHandshakes List of {@link SSLTransmission}s that are awaiting for handshake completion + * @param pendingHandshakeTransmissions List of {@link Transmission}s that are awaiting for handshake completion */ public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, - final List pendingSslHandshakes) { + final Map pendingHandshakeTransmissions) { selectorActiveConnections = new Gauge() { @Override public Long getValue() { @@ -141,7 +141,7 @@ public Long getValue() { selectorPendingHandshakes = new Gauge() { @Override public Integer getValue() { - return pendingSslHandshakes.size(); + return pendingHandshakeTransmissions.size(); } }; } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 042b524155..57dcb10746 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,8 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final List pendingSslHandshakes; - private final Map sslHandshakeTimer; + private final Map pendingHandshakeTransmissions; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -92,12 +91,11 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); - this.pendingSslHandshakes = new ArrayList<>(); - this.sslHandshakeTimer = new HashMap(); + this.pendingHandshakeTransmissions = new HashMap<>(); this.metrics = metrics; this.IdGenerator = new AtomicLong(0); this.activeConnections = new AtomicLong(0); - this.metrics.initializeSelectorMetricsIfRequired(activeConnections, pendingSslHandshakes); + this.metrics.initializeSelectorMetricsIfRequired(activeConnections, pendingHandshakeTransmissions); this.sslFactory = sslFactory; } @@ -324,8 +322,8 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - pendingSslHandshakes.add((SSLTransmission) transmission); - sslHandshakeTimer.put(transmission.getConnectionId(), System.currentTimeMillis()); + pendingHandshakeTransmissions.put(transmission.getConnectionId(), + new PendingHandshakeTransmission((SSLTransmission) transmission)); } } @@ -369,17 +367,14 @@ public void poll(long timeoutMs, List sends) * Add those Ssl connections to connected list on handshake completion */ private void completeSslHandshakes() { - Iterator sslTransIter = pendingSslHandshakes.iterator(); - while (sslTransIter.hasNext()) { - Transmission sslTransmission = sslTransIter.next(); - if (sslTransmission != null && sslTransmission.ready()) { - connected.add(sslTransmission.getConnectionId()); + Iterator pendingSslTransIter = pendingHandshakeTransmissions.values().iterator(); + while (pendingSslTransIter.hasNext()) { + PendingHandshakeTransmission pendingSslTrans = pendingSslTransIter.next(); + if (pendingSslTrans.sslTransmission.ready()) { + connected.add(pendingSslTrans.sslTransmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); - Long handshakeStartTime = sslHandshakeTimer.remove(sslTransmission.getConnectionId()); - if (handshakeStartTime != null) { - metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - handshakeStartTime.longValue()); - } - sslTransIter.remove(); + metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - pendingSslTrans.pendingSinceMs); + pendingHandshakeTransmissions.remove(pendingSslTrans.sslTransmission.getConnectionId()); } } } @@ -486,9 +481,8 @@ private void close(SelectionKey key) { activeConnections.set(this.keyMap.size()); try { transmission.close(); - if (pendingSslHandshakes.contains(transmission.getConnectionId())) { - pendingSslHandshakes.remove(transmission.getConnectionId()); - sslHandshakeTimer.remove(transmission.getConnectionId()); + if (pendingHandshakeTransmissions.get(transmission.getConnectionId()) != null) { + pendingHandshakeTransmissions.remove(transmission.getConnectionId()); } } catch (IOException e) { logger.error("IOException thrown during closing of transmission with connectionId {} :", @@ -573,3 +567,13 @@ private SocketChannel channel(SelectionKey key) { return (SocketChannel) key.channel(); } } + +class PendingHandshakeTransmission { + SSLTransmission sslTransmission; + long pendingSinceMs; + + PendingHandshakeTransmission(SSLTransmission sslTransmission) { + this.sslTransmission = sslTransmission; + pendingSinceMs = System.currentTimeMillis(); + } +} diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index c016b787b9..7bb93b275f 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -215,6 +215,23 @@ public void testSSLConnect() Assert.assertTrue("Channel should have been ready by now ", selector.isChannelReady(connectionId)); } + @Test + public void testCloseDuringSslHandshake() + throws IOException { + String connectionId = + selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL); + Assert.assertFalse("Channel should not be ready by now (until handshake completes)", + selector.isChannelReady(connectionId)); + selector.poll(10000L); + Assert.assertFalse("Channel shouldn't have been added to connect list w/o handshake", + selector.connected().contains(connectionId)); + selector.close(connectionId); + Assert.assertFalse("Channel should not have been added to connected list ", + selector.connected().contains(connectionId)); + Assert.assertTrue("Channel should have been added to disconnected list", + selector.disconnected().contains(connectionId)); + } + private String blockingRequest(String connectionId, String s) throws Exception { selector.poll(1000L, asList(SelectorTest.createSend(connectionId, s))); From a95ad0a6e26abca67b21858007c82bf2c0dae235 Mon Sep 17 00:00:00 2001 From: sinaraya Date: Mon, 16 May 2016 21:39:38 -0700 Subject: [PATCH 04/15] Fixing transmission close --- .../java/com.github.ambry.network/Selector.java | 13 ++++--------- .../java/com.github.ambry.network/Transmission.java | 3 +-- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 57dcb10746..e5a1e846b9 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -364,7 +364,7 @@ public void poll(long timeoutMs, List sends) } /** - * Add those Ssl connections to connected list on handshake completion + * Add Ssl connections to connected list on handshake completion */ private void completeSslHandshakes() { Iterator pendingSslTransIter = pendingHandshakeTransmissions.values().iterator(); @@ -479,14 +479,9 @@ private void close(SelectionKey key) { this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); activeConnections.set(this.keyMap.size()); - try { - transmission.close(); - if (pendingHandshakeTransmissions.get(transmission.getConnectionId()) != null) { - pendingHandshakeTransmissions.remove(transmission.getConnectionId()); - } - } catch (IOException e) { - logger.error("IOException thrown during closing of transmission with connectionId {} :", - transmission.getConnectionId(), e); + transmission.close(); + if (pendingHandshakeTransmissions.get(transmission.getConnectionId()) != null) { + pendingHandshakeTransmissions.remove(transmission.getConnectionId()); } } else { key.attach(null); diff --git a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java index 93cdad9690..afe04b2a95 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java @@ -132,8 +132,7 @@ public SocketAddress getRemoteSocketAddress() { /** * Close the connection for the socket channel */ - public abstract void close() - throws IOException; + public abstract void close(); public String getConnectionId() { return connectionId; From 5dbba154409dc2b0cc9a56d9b93c71c53196956f Mon Sep 17 00:00:00 2001 From: sinaraya Date: Tue, 17 May 2016 20:00:12 -0700 Subject: [PATCH 05/15] Fixing iterating over pending ssl transmissions --- .../NetworkMetrics.java | 4 +-- .../com.github.ambry.network/Selector.java | 29 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index aa90d376f0..4b9143987d 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -126,9 +126,9 @@ public NetworkMetrics(MetricRegistry registry) { } /** - * Initializes few network metrics for the selector + * Initializes a few network metrics for the selector * @param activeConnections count of current active connections - * @param pendingHandshakeTransmissions List of {@link Transmission}s that are awaiting for handshake completion + * @param pendingHandshakeTransmissions List of {@link Transmission}s that are awaiting handshake completion */ public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, final Map pendingHandshakeTransmissions) { diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index e5a1e846b9..d211c98425 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,7 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final Map pendingHandshakeTransmissions; + private final Map pendingSslTrans; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -91,11 +91,11 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); - this.pendingHandshakeTransmissions = new HashMap<>(); + this.pendingSslTrans = new HashMap<>(); this.metrics = metrics; this.IdGenerator = new AtomicLong(0); this.activeConnections = new AtomicLong(0); - this.metrics.initializeSelectorMetricsIfRequired(activeConnections, pendingHandshakeTransmissions); + this.metrics.initializeSelectorMetricsIfRequired(activeConnections, pendingSslTrans); this.sslFactory = sslFactory; } @@ -322,7 +322,7 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - pendingHandshakeTransmissions.put(transmission.getConnectionId(), + pendingSslTrans.put(transmission.getConnectionId(), new PendingHandshakeTransmission((SSLTransmission) transmission)); } } @@ -367,14 +367,16 @@ public void poll(long timeoutMs, List sends) * Add Ssl connections to connected list on handshake completion */ private void completeSslHandshakes() { - Iterator pendingSslTransIter = pendingHandshakeTransmissions.values().iterator(); + Iterator> pendingSslTransIter = + pendingSslTrans.entrySet().iterator(); while (pendingSslTransIter.hasNext()) { - PendingHandshakeTransmission pendingSslTrans = pendingSslTransIter.next(); - if (pendingSslTrans.sslTransmission.ready()) { - connected.add(pendingSslTrans.sslTransmission.getConnectionId()); + Map.Entry pendingSslTrans = pendingSslTransIter.next(); + if (pendingSslTrans.getValue().sslTransmission.ready()) { + connected.add(pendingSslTrans.getValue().sslTransmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); - metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - pendingSslTrans.pendingSinceMs); - pendingHandshakeTransmissions.remove(pendingSslTrans.sslTransmission.getConnectionId()); + metrics.selectorPerceivedSslHandshakeTime + .update(System.currentTimeMillis() - pendingSslTrans.getValue().pendingSinceMs); + pendingSslTransIter.remove(); } } } @@ -480,9 +482,7 @@ private void close(SelectionKey key) { this.keyMap.remove(transmission.getConnectionId()); activeConnections.set(this.keyMap.size()); transmission.close(); - if (pendingHandshakeTransmissions.get(transmission.getConnectionId()) != null) { - pendingHandshakeTransmissions.remove(transmission.getConnectionId()); - } + pendingSslTrans.remove(transmission.getConnectionId()); } else { key.attach(null); key.cancel(); @@ -563,6 +563,9 @@ private SocketChannel channel(SelectionKey key) { } } +/** + * Class used to store {@link SSLTransmission} which are yet to complete its handshake, along with start time of the same + */ class PendingHandshakeTransmission { SSLTransmission sslTransmission; long pendingSinceMs; From 1931fdfe82e47610528f7517e7f539d323c04c66 Mon Sep 17 00:00:00 2001 From: sinaraya Date: Wed, 18 May 2016 09:08:29 -0700 Subject: [PATCH 06/15] Fixing additional comments --- .../NetworkMetrics.java | 2 +- .../PlainTextTransmission.java | 2 +- .../com.github.ambry.network/Selector.java | 28 +++++++++---------- .../SSLSelectorTest.java | 5 +--- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 4b9143987d..dd3d3e2fef 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -131,7 +131,7 @@ public NetworkMetrics(MetricRegistry registry) { * @param pendingHandshakeTransmissions List of {@link Transmission}s that are awaiting handshake completion */ public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, - final Map pendingHandshakeTransmissions) { + final Map pendingHandshakeTransmissions) { selectorActiveConnections = new Gauge() { @Override public Long getValue() { diff --git a/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java b/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java index 625fb141b2..d5ff4826e7 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java @@ -26,7 +26,7 @@ * Transmission used to speak plain text to the underlying channel. */ public class PlainTextTransmission extends Transmission { - private static final Logger logger = LoggerFactory.getLogger(SSLTransmission.class); + private static final Logger logger = LoggerFactory.getLogger(PlainTextTransmission.class); public PlainTextTransmission(String connectionId, SocketChannel socketChannel, SelectionKey key, Time time, NetworkMetrics metrics) { diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index d211c98425..b2e4a2afe1 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,7 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final Map pendingSslTrans; + private final Map pendingSslTrans; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -323,7 +323,7 @@ public void poll(long timeoutMs, List sends) metrics.selectorConnectionCreated.inc(); } else { pendingSslTrans.put(transmission.getConnectionId(), - new PendingHandshakeTransmission((SSLTransmission) transmission)); + new TransmissionPendingHandshake((SSLTransmission) transmission)); } } @@ -356,7 +356,7 @@ public void poll(long timeoutMs, List sends) close(key); } } - completeSslHandshakes(); + checkHandshakeStatus(); this.metrics.selectorIORate.inc(); } long endIo = time.milliseconds(); @@ -364,19 +364,19 @@ public void poll(long timeoutMs, List sends) } /** - * Add Ssl connections to connected list on handshake completion + * Check for handshake status and add to connected list on handshake completion */ - private void completeSslHandshakes() { - Iterator> pendingSslTransIter = + private void checkHandshakeStatus() { + Iterator> transmissionInfoIter = pendingSslTrans.entrySet().iterator(); - while (pendingSslTransIter.hasNext()) { - Map.Entry pendingSslTrans = pendingSslTransIter.next(); - if (pendingSslTrans.getValue().sslTransmission.ready()) { - connected.add(pendingSslTrans.getValue().sslTransmission.getConnectionId()); + while (transmissionInfoIter.hasNext()) { + Map.Entry transmissionInfo = transmissionInfoIter.next(); + if (transmissionInfo.getValue().sslTransmission.ready()) { + connected.add(transmissionInfo.getKey()); metrics.selectorConnectionCreated.inc(); metrics.selectorPerceivedSslHandshakeTime - .update(System.currentTimeMillis() - pendingSslTrans.getValue().pendingSinceMs); - pendingSslTransIter.remove(); + .update(System.currentTimeMillis() - transmissionInfo.getValue().pendingSinceMs); + transmissionInfoIter.remove(); } } } @@ -566,11 +566,11 @@ private SocketChannel channel(SelectionKey key) { /** * Class used to store {@link SSLTransmission} which are yet to complete its handshake, along with start time of the same */ -class PendingHandshakeTransmission { +class TransmissionPendingHandshake { SSLTransmission sslTransmission; long pendingSinceMs; - PendingHandshakeTransmission(SSLTransmission sslTransmission) { + TransmissionPendingHandshake(SSLTransmission sslTransmission) { this.sslTransmission = sslTransmission; pendingSinceMs = System.currentTimeMillis(); } diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index 7bb93b275f..f3c0590c6f 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -220,10 +220,7 @@ public void testCloseDuringSslHandshake() throws IOException { String connectionId = selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL); - Assert.assertFalse("Channel should not be ready by now (until handshake completes)", - selector.isChannelReady(connectionId)); - selector.poll(10000L); - Assert.assertFalse("Channel shouldn't have been added to connect list w/o handshake", + Assert.assertFalse("Channel shouldn't have been added to connect list w/o completing handshake", selector.connected().contains(connectionId)); selector.close(connectionId); Assert.assertFalse("Channel should not have been added to connected list ", From d1532e6dec1f297dd0a1d94b0bc70ae77c3d3873 Mon Sep 17 00:00:00 2001 From: sinaraya Date: Wed, 18 May 2016 14:38:51 -0700 Subject: [PATCH 07/15] Minor fixes --- .../com.github.ambry.network/NetworkMetrics.java | 12 ++++++------ .../main/java/com.github.ambry.network/Selector.java | 10 ++++++++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index dd3d3e2fef..869b35b222 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -46,7 +46,7 @@ public class NetworkMetrics { public final Counter selectorCloseKeyErrorCount; public final Counter selectorCloseSocketErrorCount; public Gauge selectorActiveConnections; - public Gauge selectorPendingHandshakes; + public Gauge selectorConnectionsPendingHandshake; public final Map selectorNodeMetricMap; // Plaintext metrics @@ -128,20 +128,20 @@ public NetworkMetrics(MetricRegistry registry) { /** * Initializes a few network metrics for the selector * @param activeConnections count of current active connections - * @param pendingHandshakeTransmissions List of {@link Transmission}s that are awaiting handshake completion + * @param connectionsPendingHandshakeCount Count of connections that are awaiting handshake completion */ public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, - final Map pendingHandshakeTransmissions) { + final AtomicLong connectionsPendingHandshakeCount) { selectorActiveConnections = new Gauge() { @Override public Long getValue() { return activeConnections.get(); } }; - selectorPendingHandshakes = new Gauge() { + selectorConnectionsPendingHandshake = new Gauge() { @Override - public Integer getValue() { - return pendingHandshakeTransmissions.size(); + public Long getValue() { + return connectionsPendingHandshakeCount.get(); } }; } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index b2e4a2afe1..01edf07d3b 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -77,6 +77,7 @@ public class Selector implements Selectable { private final NetworkMetrics metrics; private final AtomicLong IdGenerator; private AtomicLong activeConnections; + private AtomicLong connectionsPendingHandshakeCount; private final SSLFactory sslFactory; /** @@ -95,7 +96,8 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.metrics = metrics; this.IdGenerator = new AtomicLong(0); this.activeConnections = new AtomicLong(0); - this.metrics.initializeSelectorMetricsIfRequired(activeConnections, pendingSslTrans); + this.connectionsPendingHandshakeCount = new AtomicLong(0); + this.metrics.initializeSelectorMetricsIfRequired(activeConnections, connectionsPendingHandshakeCount); this.sslFactory = sslFactory; } @@ -324,6 +326,7 @@ public void poll(long timeoutMs, List sends) } else { pendingSslTrans.put(transmission.getConnectionId(), new TransmissionPendingHandshake((SSLTransmission) transmission)); + connectionsPendingHandshakeCount.incrementAndGet(); } } @@ -377,6 +380,7 @@ private void checkHandshakeStatus() { metrics.selectorPerceivedSslHandshakeTime .update(System.currentTimeMillis() - transmissionInfo.getValue().pendingSinceMs); transmissionInfoIter.remove(); + connectionsPendingHandshakeCount.decrementAndGet(); } } } @@ -482,7 +486,9 @@ private void close(SelectionKey key) { this.keyMap.remove(transmission.getConnectionId()); activeConnections.set(this.keyMap.size()); transmission.close(); - pendingSslTrans.remove(transmission.getConnectionId()); + if (pendingSslTrans.remove(transmission.getConnectionId()) != null) { + connectionsPendingHandshakeCount.decrementAndGet(); + } } else { key.attach(null); key.cancel(); From f1e62ee265c472060368a87a72fcc601fd5f88ba Mon Sep 17 00:00:00 2001 From: sinaraya Date: Thu, 19 May 2016 14:42:50 -0700 Subject: [PATCH 08/15] Renaming variables and some minor fixes --- .../NetworkMetrics.java | 14 +++---- .../com.github.ambry.network/Selector.java | 42 +++++++++---------- .../SSLSelectorTest.java | 20 ++++----- 3 files changed, 35 insertions(+), 41 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 869b35b222..84f3efd0cf 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -45,8 +45,8 @@ public class NetworkMetrics { public final Counter selectorKeyOperationErrorCount; public final Counter selectorCloseKeyErrorCount; public final Counter selectorCloseSocketErrorCount; - public Gauge selectorActiveConnections; - public Gauge selectorConnectionsPendingHandshake; + public Gauge numActiveConnections; + public Gauge numConnectionsPendingHandhsake; public final Map selectorNodeMetricMap; // Plaintext metrics @@ -128,20 +128,20 @@ public NetworkMetrics(MetricRegistry registry) { /** * Initializes a few network metrics for the selector * @param activeConnections count of current active connections - * @param connectionsPendingHandshakeCount Count of connections that are awaiting handshake completion + * @param numConnectionsPendingHandhsake Count of connections that are awaiting handshake completion */ public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, - final AtomicLong connectionsPendingHandshakeCount) { - selectorActiveConnections = new Gauge() { + final AtomicLong numConnectionsPendingHandhsake) { + numActiveConnections = new Gauge() { @Override public Long getValue() { return activeConnections.get(); } }; - selectorConnectionsPendingHandshake = new Gauge() { + this.numConnectionsPendingHandhsake = new Gauge() { @Override public Long getValue() { - return connectionsPendingHandshakeCount.get(); + return numConnectionsPendingHandhsake.get(); } }; } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 01edf07d3b..850a52357d 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,7 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final Map pendingSslTrans; + private final Map connectionPendingHandshake; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -92,12 +92,12 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); - this.pendingSslTrans = new HashMap<>(); + this.connectionPendingHandshake = new HashMap<>(); this.metrics = metrics; this.IdGenerator = new AtomicLong(0); - this.activeConnections = new AtomicLong(0); - this.connectionsPendingHandshakeCount = new AtomicLong(0); - this.metrics.initializeSelectorMetricsIfRequired(activeConnections, connectionsPendingHandshakeCount); + activeConnections = new AtomicLong(0); + connectionsPendingHandshakeCount = new AtomicLong(0); + metrics.initializeSelectorMetricsIfRequired(activeConnections, connectionsPendingHandshakeCount); this.sslFactory = sslFactory; } @@ -324,8 +324,8 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - pendingSslTrans.put(transmission.getConnectionId(), - new TransmissionPendingHandshake((SSLTransmission) transmission)); + connectionPendingHandshake + .put(transmission.getConnectionId(), new UnreadyConnections((SSLTransmission) transmission)); connectionsPendingHandshakeCount.incrementAndGet(); } } @@ -359,7 +359,7 @@ public void poll(long timeoutMs, List sends) close(key); } } - checkHandshakeStatus(); + checkUnreadyConnectionsStatus(); this.metrics.selectorIORate.inc(); } long endIo = time.milliseconds(); @@ -367,14 +367,14 @@ public void poll(long timeoutMs, List sends) } /** - * Check for handshake status and add to connected list on handshake completion + * Check readiness for unready connections and add to completed list if ready */ - private void checkHandshakeStatus() { - Iterator> transmissionInfoIter = - pendingSslTrans.entrySet().iterator(); + private void checkUnreadyConnectionsStatus() { + Iterator> transmissionInfoIter = + connectionPendingHandshake.entrySet().iterator(); while (transmissionInfoIter.hasNext()) { - Map.Entry transmissionInfo = transmissionInfoIter.next(); - if (transmissionInfo.getValue().sslTransmission.ready()) { + Map.Entry transmissionInfo = transmissionInfoIter.next(); + if (transmissionInfo.getValue().transmission.ready()) { connected.add(transmissionInfo.getKey()); metrics.selectorConnectionCreated.inc(); metrics.selectorPerceivedSslHandshakeTime @@ -485,10 +485,10 @@ private void close(SelectionKey key) { this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); activeConnections.set(this.keyMap.size()); - transmission.close(); - if (pendingSslTrans.remove(transmission.getConnectionId()) != null) { + if (connectionPendingHandshake.remove(transmission.getConnectionId()) != null) { connectionsPendingHandshakeCount.decrementAndGet(); } + transmission.close(); } else { key.attach(null); key.cancel(); @@ -570,14 +570,14 @@ private SocketChannel channel(SelectionKey key) { } /** - * Class used to store {@link SSLTransmission} which are yet to complete its handshake, along with start time of the same + * Class used to store {@link Transmission} which are yet to complete its handshake, along with start time of the same */ -class TransmissionPendingHandshake { - SSLTransmission sslTransmission; +class UnreadyConnections { + Transmission transmission; long pendingSinceMs; - TransmissionPendingHandshake(SSLTransmission sslTransmission) { - this.sslTransmission = sslTransmission; + UnreadyConnections(Transmission transmission) { + this.transmission = transmission; pendingSinceMs = System.currentTimeMillis(); } } diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index f3c0590c6f..c818c81ab9 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -22,8 +22,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; -import junit.framework.Assert; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -48,16 +48,16 @@ public void setup() TestSSLUtils.createSSLConfig("DC1,DC2,DC3", SSLFactory.Mode.CLIENT, trustStoreFile, "client"); SSLFactory serverSSLFactory = new SSLFactory(sslConfig); SSLFactory clientSSLFactory = new SSLFactory(clientSSLConfig); - this.server = new EchoServer(serverSSLFactory, 18383); - this.server.start(); - this.selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), clientSSLFactory); + server = new EchoServer(serverSSLFactory, 18383); + server.start(); + selector = new Selector(new NetworkMetrics(new MetricRegistry()), SystemTime.getInstance(), clientSSLFactory); } @After public void teardown() throws Exception { - this.selector.close(); - this.server.close(); + selector.close(); + server.close(); } /** @@ -71,7 +71,7 @@ public void testServerDisconnect() assertEquals("hello", blockingRequest(connectionId, "hello")); // disconnect - this.server.closeConnections(); + server.closeConnections(); while (!selector.disconnected().contains(connectionId)) { selector.poll(1000L); } @@ -207,8 +207,6 @@ public void testSSLConnect() throws IOException { String connectionId = selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL); - Assert.assertFalse("Channel should not be ready by now (until handshake completes)", - selector.isChannelReady(connectionId)); while (!selector.connected().contains(connectionId)) { selector.poll(10000L); } @@ -220,11 +218,7 @@ public void testCloseDuringSslHandshake() throws IOException { String connectionId = selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL); - Assert.assertFalse("Channel shouldn't have been added to connect list w/o completing handshake", - selector.connected().contains(connectionId)); selector.close(connectionId); - Assert.assertFalse("Channel should not have been added to connected list ", - selector.connected().contains(connectionId)); Assert.assertTrue("Channel should have been added to disconnected list", selector.disconnected().contains(connectionId)); } From b996430d02a83f58373a57b34edd721fafc94061 Mon Sep 17 00:00:00 2001 From: sinaraya Date: Thu, 19 May 2016 18:04:14 -0700 Subject: [PATCH 09/15] Simplifying the patch further --- .../com.github.ambry.network/Selector.java | 38 ++++++------------- 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 850a52357d..c075409e7d 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,7 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final Map connectionPendingHandshake; + private final Map connectionPendingHandshakeTimer; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -92,7 +92,7 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); - this.connectionPendingHandshake = new HashMap<>(); + this.connectionPendingHandshakeTimer = new HashMap<>(); this.metrics = metrics; this.IdGenerator = new AtomicLong(0); activeConnections = new AtomicLong(0); @@ -324,8 +324,7 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - connectionPendingHandshake - .put(transmission.getConnectionId(), new UnreadyConnections((SSLTransmission) transmission)); + connectionPendingHandshakeTimer.put(transmission.getConnectionId(), System.currentTimeMillis()); connectionsPendingHandshakeCount.incrementAndGet(); } } @@ -370,16 +369,14 @@ public void poll(long timeoutMs, List sends) * Check readiness for unready connections and add to completed list if ready */ private void checkUnreadyConnectionsStatus() { - Iterator> transmissionInfoIter = - connectionPendingHandshake.entrySet().iterator(); - while (transmissionInfoIter.hasNext()) { - Map.Entry transmissionInfo = transmissionInfoIter.next(); - if (transmissionInfo.getValue().transmission.ready()) { - connected.add(transmissionInfo.getKey()); + Iterator> iterator = connectionPendingHandshakeTimer.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (isChannelReady(entry.getKey())) { + connected.add(entry.getKey()); metrics.selectorConnectionCreated.inc(); - metrics.selectorPerceivedSslHandshakeTime - .update(System.currentTimeMillis() - transmissionInfo.getValue().pendingSinceMs); - transmissionInfoIter.remove(); + metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - entry.getValue()); + iterator.remove(); connectionsPendingHandshakeCount.decrementAndGet(); } } @@ -485,7 +482,7 @@ private void close(SelectionKey key) { this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); activeConnections.set(this.keyMap.size()); - if (connectionPendingHandshake.remove(transmission.getConnectionId()) != null) { + if (connectionPendingHandshakeTimer.remove(transmission.getConnectionId()) != null) { connectionsPendingHandshakeCount.decrementAndGet(); } transmission.close(); @@ -568,16 +565,3 @@ private SocketChannel channel(SelectionKey key) { return (SocketChannel) key.channel(); } } - -/** - * Class used to store {@link Transmission} which are yet to complete its handshake, along with start time of the same - */ -class UnreadyConnections { - Transmission transmission; - long pendingSinceMs; - - UnreadyConnections(Transmission transmission) { - this.transmission = transmission; - pendingSinceMs = System.currentTimeMillis(); - } -} From ebef578f4e6add5332c6481e7e644f5dc671bd3f Mon Sep 17 00:00:00 2001 From: sinaraya Date: Fri, 20 May 2016 10:27:05 -0700 Subject: [PATCH 10/15] Removing duplicate metric --- .../NetworkMetrics.java | 3 --- .../com.github.ambry.network/Selector.java | 21 ++++++++----------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 84f3efd0cf..cb400129ca 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -38,7 +38,6 @@ public class NetworkMetrics { public final Histogram selectorSelectTime; public final Counter selectorIORate; public final Histogram selectorIOTime; - public final Histogram selectorPerceivedSslHandshakeTime; public final Counter selectorNioCloseErrorCount; public final Counter selectorDisconnectedErrorCount; public final Counter selectorIOErrorCount; @@ -89,8 +88,6 @@ public NetworkMetrics(MetricRegistry registry) { selectorIORate = registry.counter(MetricRegistry.name(Selector.class, "SelectorIORate")); selectorSelectTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorSelectTime")); selectorIOTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorIOTime")); - selectorPerceivedSslHandshakeTime = - registry.histogram(MetricRegistry.name(Selector.class, "SelectorSslHandshakeTime")); selectorNioCloseErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorNioCloseErrorCount")); selectorDisconnectedErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorDisconnectedErrorCount")); diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index c075409e7d..a3a500c18e 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,7 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final Map connectionPendingHandshakeTimer; + private final List connectionPendingHandshakes; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -92,11 +92,11 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); - this.connectionPendingHandshakeTimer = new HashMap<>(); this.metrics = metrics; this.IdGenerator = new AtomicLong(0); activeConnections = new AtomicLong(0); connectionsPendingHandshakeCount = new AtomicLong(0); + connectionPendingHandshakes = new ArrayList<>(); metrics.initializeSelectorMetricsIfRequired(activeConnections, connectionsPendingHandshakeCount); this.sslFactory = sslFactory; } @@ -324,7 +324,7 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - connectionPendingHandshakeTimer.put(transmission.getConnectionId(), System.currentTimeMillis()); + connectionPendingHandshakes.add(transmission.getConnectionId()); connectionsPendingHandshakeCount.incrementAndGet(); } } @@ -369,14 +369,13 @@ public void poll(long timeoutMs, List sends) * Check readiness for unready connections and add to completed list if ready */ private void checkUnreadyConnectionsStatus() { - Iterator> iterator = connectionPendingHandshakeTimer.entrySet().iterator(); + Iterator iterator = connectionPendingHandshakes.iterator(); while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (isChannelReady(entry.getKey())) { - connected.add(entry.getKey()); - metrics.selectorConnectionCreated.inc(); - metrics.selectorPerceivedSslHandshakeTime.update(System.currentTimeMillis() - entry.getValue()); + String connId = iterator.next(); + if (isChannelReady(connId)) { + connected.add(connId); iterator.remove(); + metrics.selectorConnectionCreated.inc(); connectionsPendingHandshakeCount.decrementAndGet(); } } @@ -482,9 +481,7 @@ private void close(SelectionKey key) { this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); activeConnections.set(this.keyMap.size()); - if (connectionPendingHandshakeTimer.remove(transmission.getConnectionId()) != null) { - connectionsPendingHandshakeCount.decrementAndGet(); - } + connectionPendingHandshakes.remove(transmission.getConnectionId()); transmission.close(); } else { key.attach(null); From ad1f1465255be6c90fc5ef1ad24991d069a18010 Mon Sep 17 00:00:00 2001 From: sinaraya Date: Fri, 20 May 2016 20:08:21 -0700 Subject: [PATCH 11/15] renaming variable --- .../NetworkMetrics.java | 10 ++++---- .../com.github.ambry.network/Selector.java | 24 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index cb400129ca..0bb5358b9d 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -45,7 +45,7 @@ public class NetworkMetrics { public final Counter selectorCloseKeyErrorCount; public final Counter selectorCloseSocketErrorCount; public Gauge numActiveConnections; - public Gauge numConnectionsPendingHandhsake; + public Gauge numConnectionsPendingHandshake; public final Map selectorNodeMetricMap; // Plaintext metrics @@ -125,20 +125,20 @@ public NetworkMetrics(MetricRegistry registry) { /** * Initializes a few network metrics for the selector * @param activeConnections count of current active connections - * @param numConnectionsPendingHandhsake Count of connections that are awaiting handshake completion + * @param connectionsPendingHandshake Count of connections that are awaiting handshake completion */ public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, - final AtomicLong numConnectionsPendingHandhsake) { + final AtomicLong connectionsPendingHandshake) { numActiveConnections = new Gauge() { @Override public Long getValue() { return activeConnections.get(); } }; - this.numConnectionsPendingHandhsake = new Gauge() { + numConnectionsPendingHandshake = new Gauge() { @Override public Long getValue() { - return numConnectionsPendingHandhsake.get(); + return connectionsPendingHandshake.get(); } }; } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index a3a500c18e..7e51220872 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -76,8 +76,8 @@ public class Selector implements Selectable { private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; - private AtomicLong activeConnections; - private AtomicLong connectionsPendingHandshakeCount; + private AtomicLong numActiveConnections; + private AtomicLong numConnectionsPendingHandshake; private final SSLFactory sslFactory; /** @@ -94,10 +94,10 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.disconnected = new ArrayList(); this.metrics = metrics; this.IdGenerator = new AtomicLong(0); - activeConnections = new AtomicLong(0); - connectionsPendingHandshakeCount = new AtomicLong(0); + numActiveConnections = new AtomicLong(0); + numConnectionsPendingHandshake = new AtomicLong(0); connectionPendingHandshakes = new ArrayList<>(); - metrics.initializeSelectorMetricsIfRequired(activeConnections, connectionsPendingHandshakeCount); + metrics.initializeSelectorMetricsIfRequired(numActiveConnections, numConnectionsPendingHandshake); this.sslFactory = sslFactory; } @@ -167,7 +167,7 @@ public String connect(InetSocketAddress address, int sendBufferSize, int receive } key.attach(transmission); this.keyMap.put(connectionId, key); - activeConnections.set(this.keyMap.size()); + numActiveConnections.set(this.keyMap.size()); return connectionId; } @@ -194,7 +194,7 @@ public String register(SocketChannel channel, PortType portType) } key.attach(transmission); this.keyMap.put(connectionId, key); - activeConnections.set(this.keyMap.size()); + numActiveConnections.set(this.keyMap.size()); return connectionId; } @@ -325,7 +325,7 @@ public void poll(long timeoutMs, List sends) metrics.selectorConnectionCreated.inc(); } else { connectionPendingHandshakes.add(transmission.getConnectionId()); - connectionsPendingHandshakeCount.incrementAndGet(); + numConnectionsPendingHandshake.incrementAndGet(); } } @@ -376,7 +376,7 @@ private void checkUnreadyConnectionsStatus() { connected.add(connId); iterator.remove(); metrics.selectorConnectionCreated.inc(); - connectionsPendingHandshakeCount.decrementAndGet(); + numConnectionsPendingHandshake.decrementAndGet(); } } } @@ -425,8 +425,8 @@ public List connected() { return this.connected; } - public long getActiveConnections() { - return activeConnections.get(); + public long getNumActiveConnections() { + return numActiveConnections.get(); } /** @@ -480,7 +480,7 @@ private void close(SelectionKey key) { logger.debug("Closing connection from {}", transmission.getConnectionId()); this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); - activeConnections.set(this.keyMap.size()); + numActiveConnections.set(this.keyMap.size()); connectionPendingHandshakes.remove(transmission.getConnectionId()); transmission.close(); } else { From 23b045403178a26ac1f2176e6e1ceb2d09e6c0f2 Mon Sep 17 00:00:00 2001 From: sinaraya Date: Sat, 21 May 2016 09:02:55 -0700 Subject: [PATCH 12/15] Addressing feedback --- .../NetworkMetrics.java | 2 +- .../com.github.ambry.network/Selector.java | 21 ++++++++++++------- .../Transmission.java | 3 ++- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 0bb5358b9d..2346f58ef8 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -127,7 +127,7 @@ public NetworkMetrics(MetricRegistry registry) { * @param activeConnections count of current active connections * @param connectionsPendingHandshake Count of connections that are awaiting handshake completion */ - public void initializeSelectorMetricsIfRequired(final AtomicLong activeConnections, + public void initializeSelectorMetrics(final AtomicLong activeConnections, final AtomicLong connectionsPendingHandshake) { numActiveConnections = new Gauge() { @Override diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 7e51220872..38090c5b31 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -72,7 +72,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final List connectionPendingHandshakes; + private final List connectionsPendingHandshake; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -96,8 +96,8 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.IdGenerator = new AtomicLong(0); numActiveConnections = new AtomicLong(0); numConnectionsPendingHandshake = new AtomicLong(0); - connectionPendingHandshakes = new ArrayList<>(); - metrics.initializeSelectorMetricsIfRequired(numActiveConnections, numConnectionsPendingHandshake); + connectionsPendingHandshake = new ArrayList<>(); + metrics.initializeSelectorMetrics(numActiveConnections, numConnectionsPendingHandshake); this.sslFactory = sslFactory; } @@ -324,7 +324,7 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - connectionPendingHandshakes.add(transmission.getConnectionId()); + connectionsPendingHandshake.add(transmission.getConnectionId()); numConnectionsPendingHandshake.incrementAndGet(); } } @@ -369,7 +369,7 @@ public void poll(long timeoutMs, List sends) * Check readiness for unready connections and add to completed list if ready */ private void checkUnreadyConnectionsStatus() { - Iterator iterator = connectionPendingHandshakes.iterator(); + Iterator iterator = connectionsPendingHandshake.iterator(); while (iterator.hasNext()) { String connId = iterator.next(); if (isChannelReady(connId)) { @@ -480,9 +480,14 @@ private void close(SelectionKey key) { logger.debug("Closing connection from {}", transmission.getConnectionId()); this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); - numActiveConnections.set(this.keyMap.size()); - connectionPendingHandshakes.remove(transmission.getConnectionId()); - transmission.close(); + numActiveConnections.set(keyMap.size()); + connectionsPendingHandshake.remove(transmission.getConnectionId()); + try { + transmission.close(); + } catch (IOException e) { + logger.error("IOException thrown during closing of transmission with connectionId {} :", + transmission.getConnectionId(), e); + } } else { key.attach(null); key.cancel(); diff --git a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java index afe04b2a95..93cdad9690 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java @@ -132,7 +132,8 @@ public SocketAddress getRemoteSocketAddress() { /** * Close the connection for the socket channel */ - public abstract void close(); + public abstract void close() + throws IOException; public String getConnectionId() { return connectionId; From 0b59f29530e32b5348f397cd56943d7e5e8feb96 Mon Sep 17 00:00:00 2001 From: sinaraya Date: Mon, 23 May 2016 09:07:26 -0700 Subject: [PATCH 13/15] Fixing docs --- .../NetworkMetrics.java | 11 +---------- .../com.github.ambry.network/Selector.java | 19 ++++++++----------- 2 files changed, 9 insertions(+), 21 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 2346f58ef8..9f662cad64 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -45,7 +45,6 @@ public class NetworkMetrics { public final Counter selectorCloseKeyErrorCount; public final Counter selectorCloseSocketErrorCount; public Gauge numActiveConnections; - public Gauge numConnectionsPendingHandshake; public final Map selectorNodeMetricMap; // Plaintext metrics @@ -125,22 +124,14 @@ public NetworkMetrics(MetricRegistry registry) { /** * Initializes a few network metrics for the selector * @param activeConnections count of current active connections - * @param connectionsPendingHandshake Count of connections that are awaiting handshake completion */ - public void initializeSelectorMetrics(final AtomicLong activeConnections, - final AtomicLong connectionsPendingHandshake) { + public void initializeSelectorMetrics(final AtomicLong activeConnections) { numActiveConnections = new Gauge() { @Override public Long getValue() { return activeConnections.get(); } }; - numConnectionsPendingHandshake = new Gauge() { - @Override - public Long getValue() { - return connectionsPendingHandshake.get(); - } - }; } public void initializeSelectorNodeMetricIfRequired(String hostname, int port) { diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 38090c5b31..50cfb771b4 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -26,6 +26,7 @@ import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -72,12 +73,11 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final List connectionsPendingHandshake; + private final Set unreadycConnections; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; private AtomicLong numActiveConnections; - private AtomicLong numConnectionsPendingHandshake; private final SSLFactory sslFactory; /** @@ -95,9 +95,8 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.metrics = metrics; this.IdGenerator = new AtomicLong(0); numActiveConnections = new AtomicLong(0); - numConnectionsPendingHandshake = new AtomicLong(0); - connectionsPendingHandshake = new ArrayList<>(); - metrics.initializeSelectorMetrics(numActiveConnections, numConnectionsPendingHandshake); + unreadycConnections = new HashSet<>(); + metrics.initializeSelectorMetrics(numActiveConnections); this.sslFactory = sslFactory; } @@ -324,8 +323,7 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - connectionsPendingHandshake.add(transmission.getConnectionId()); - numConnectionsPendingHandshake.incrementAndGet(); + unreadycConnections.add(transmission.getConnectionId()); } } @@ -369,14 +367,13 @@ public void poll(long timeoutMs, List sends) * Check readiness for unready connections and add to completed list if ready */ private void checkUnreadyConnectionsStatus() { - Iterator iterator = connectionsPendingHandshake.iterator(); + Iterator iterator = unreadycConnections.iterator(); while (iterator.hasNext()) { String connId = iterator.next(); if (isChannelReady(connId)) { connected.add(connId); iterator.remove(); metrics.selectorConnectionCreated.inc(); - numConnectionsPendingHandshake.decrementAndGet(); } } } @@ -396,7 +393,7 @@ private String socketDescription(SocketChannel channel) { } /** - * Returns true if channel is ready after completing handshake to accept reads/writes + * Returns {@code true} if channel is ready to send or receive data, {@code false} otherwise * @param connectionId upon which readiness is checked for * @return true if channel is ready to accept reads/writes, false otherwise */ @@ -481,7 +478,7 @@ private void close(SelectionKey key) { this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); numActiveConnections.set(keyMap.size()); - connectionsPendingHandshake.remove(transmission.getConnectionId()); + unreadycConnections.remove(transmission.getConnectionId()); try { transmission.close(); } catch (IOException e) { From 059e5ec6d4285531c93078cf738fc81aea69233e Mon Sep 17 00:00:00 2001 From: sinaraya Date: Mon, 23 May 2016 12:26:06 -0700 Subject: [PATCH 14/15] Addressing Priyesh's comment --- .../main/java/com.github.ambry.network/Selector.java | 10 +++++----- .../java/com.github.ambry.network/SSLSelectorTest.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 50cfb771b4..e89636326f 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -73,7 +73,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; - private final Set unreadycConnections; + private final Set unreadyConnections; private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; @@ -95,7 +95,7 @@ public Selector(NetworkMetrics metrics, Time time, SSLFactory sslFactory) this.metrics = metrics; this.IdGenerator = new AtomicLong(0); numActiveConnections = new AtomicLong(0); - unreadycConnections = new HashSet<>(); + unreadyConnections = new HashSet<>(); metrics.initializeSelectorMetrics(numActiveConnections); this.sslFactory = sslFactory; } @@ -323,7 +323,7 @@ public void poll(long timeoutMs, List sends) connected.add(transmission.getConnectionId()); metrics.selectorConnectionCreated.inc(); } else { - unreadycConnections.add(transmission.getConnectionId()); + unreadyConnections.add(transmission.getConnectionId()); } } @@ -367,7 +367,7 @@ public void poll(long timeoutMs, List sends) * Check readiness for unready connections and add to completed list if ready */ private void checkUnreadyConnectionsStatus() { - Iterator iterator = unreadycConnections.iterator(); + Iterator iterator = unreadyConnections.iterator(); while (iterator.hasNext()) { String connId = iterator.next(); if (isChannelReady(connId)) { @@ -478,7 +478,7 @@ private void close(SelectionKey key) { this.disconnected.add(transmission.getConnectionId()); this.keyMap.remove(transmission.getConnectionId()); numActiveConnections.set(keyMap.size()); - unreadycConnections.remove(transmission.getConnectionId()); + unreadyConnections.remove(transmission.getConnectionId()); try { transmission.close(); } catch (IOException e) { diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index c818c81ab9..36444f2a4c 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -214,7 +214,7 @@ public void testSSLConnect() } @Test - public void testCloseDuringSslHandshake() + public void testCloseAfterConnectCall() throws IOException { String connectionId = selector.connect(new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE, PortType.SSL); From 969f536d34d60a4551ee1ec9056182283fea63be Mon Sep 17 00:00:00 2001 From: sinaraya Date: Tue, 24 May 2016 00:59:04 -0700 Subject: [PATCH 15/15] Addressing gopal's comments --- .../src/main/java/com.github.ambry.network/NetworkMetrics.java | 2 +- .../src/main/java/com.github.ambry.network/Selector.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 9f662cad64..63008f9212 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -125,7 +125,7 @@ public NetworkMetrics(MetricRegistry registry) { * Initializes a few network metrics for the selector * @param activeConnections count of current active connections */ - public void initializeSelectorMetrics(final AtomicLong activeConnections) { + void initializeSelectorMetrics(final AtomicLong activeConnections) { numActiveConnections = new Gauge() { @Override public Long getValue() { diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index e89636326f..452d3fbf90 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -77,7 +77,7 @@ public class Selector implements Selectable { private final Time time; private final NetworkMetrics metrics; private final AtomicLong IdGenerator; - private AtomicLong numActiveConnections; + private final AtomicLong numActiveConnections; private final SSLFactory sslFactory; /**