Skip to content

Commit

Permalink
KAFKA-6809: Count inbound connections in the connection-creation metr…
Browse files Browse the repository at this point in the history
…ic (apache#5301)

Previously, the connection-creation metric only accounted for opened connections from the broker. This change extends it to account for received connections.
  • Loading branch information
stanislavkozlovski authored and ying-zheng committed Jul 6, 2018
1 parent 7d63344 commit 632c277
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
Expand Up @@ -265,6 +265,7 @@ private void configureSocketChannel(SocketChannel socketChannel, int sendBufferS
public void register(String id, SocketChannel socketChannel) throws IOException {
ensureNotRegistered(id);
registerChannel(id, socketChannel, SelectionKey.OP_READ);
this.sensors.connectionCreated.record();
}

private void ensureNotRegistered(String id) {
Expand Down
Expand Up @@ -17,8 +17,10 @@
package org.apache.kafka.common.network;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -49,6 +51,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Optional;

import static org.easymock.EasyMock.createControl;
import static org.easymock.EasyMock.expect;
Expand Down Expand Up @@ -580,6 +583,49 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception {
control.verify();
}

@Test
public void testOutboundConnectionsCountInConnectionCreationMetric() throws Exception {
// create connections
int expectedConnections = 5;
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
for (int i = 0; i < expectedConnections; i++)
connect(Integer.toString(i), addr);

// Poll continuously, as we cannot guarantee that the first call will see all connections
int seenConnections = 0;
for (int i = 0; i < 10; i++) {
selector.poll(100L);
seenConnections += selector.connected().size();
if (seenConnections == expectedConnections)
break;
}

assertEquals((double) expectedConnections, getMetric("connection-creation-total").metricValue());
assertEquals((double) expectedConnections, getMetric("connection-count").metricValue());
}

@Test
public void testInboundConnectionsCountInConnectionCreationMetric() throws Exception {
int conns = 5;

try (ServerSocketChannel ss = ServerSocketChannel.open()) {
ss.bind(new InetSocketAddress(0));
InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();

for (int i = 0; i < conns; i++) {
Thread sender = createSender(serverAddress, randomPayload(1));
sender.start();
SocketChannel channel = ss.accept();
channel.configureBlocking(false);

selector.register(Integer.toString(i), channel);
}
}

assertEquals((double) conns, getMetric("connection-creation-total").metricValue());
assertEquals((double) conns, getMetric("connection-count").metricValue());
}

private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);
Expand Down Expand Up @@ -675,4 +721,13 @@ else if (obj instanceof Map)
assertTrue("Field not empty: " + field + " " + obj, ((Map<?, ?>) obj).isEmpty());
}

private KafkaMetric getMetric(String name) throws Exception {
Optional<Map.Entry<MetricName, KafkaMetric>> metric = metrics.metrics().entrySet().stream()
.filter(entry -> entry.getKey().name().equals(name))
.findFirst();
if (!metric.isPresent())
throw new Exception(String.format("Could not find metric called %s", name));

return metric.get().getValue();
}
}

0 comments on commit 632c277

Please sign in to comment.