Skip to content

Commit

Permalink
GH-3299: Fix client connectionId for TCP/NIO
Browse files Browse the repository at this point in the history
Resolves #3299

Connect before creating the `TcpNioConnection` object and publishing the
`TcpConnectionOpenEvent`.

This was a regression caused by supporting connect timout; which moved
the connect to after the object was created and event published, causing
the `connectionId` to start with `unknown`.

**cherry-pick to 5.3.x, 5.2.x**
  • Loading branch information
garyrussell authored and artembilan committed Jun 10, 2020
1 parent 3499cd6 commit 7ec1f5c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
Expand Up @@ -87,6 +87,7 @@ protected TcpConnectionSupport buildNewConnection() {
try {
SocketChannel socketChannel = SocketChannel.open();
setSocketAttributes(socketChannel.socket());
connect(socketChannel);
TcpNioConnection connection =
this.tcpNioConnectionSupport.createNewConnection(socketChannel, false, isLookupHost(),
getApplicationEventPublisher(), getComponentName());
Expand All @@ -98,18 +99,6 @@ protected TcpConnectionSupport buildNewConnection() {
}
TcpConnectionSupport wrappedConnection = wrapConnection(connection);
initializeConnection(wrappedConnection, socketChannel.socket());
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(getHost(), getPort()));
boolean connected = socketChannel.finishConnect();
long timeLeft = getConnectTimeout().toMillis();
while (!connected && timeLeft > 0) {
Thread.sleep(50); // NOSONAR Magic #
connected = socketChannel.finishConnect();
timeLeft -= 50; // NOSONAR Magic #
}
if (!connected) {
throw new IOException("Not connected after connectTimeout");
}
if (getSoTimeout() > 0) {
connection.setLastRead(System.currentTimeMillis());
}
Expand All @@ -127,6 +116,21 @@ protected TcpConnectionSupport buildNewConnection() {
}
}

private void connect(SocketChannel socketChannel) throws IOException, InterruptedException {
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(getHost(), getPort()));
boolean connected = socketChannel.finishConnect();
long timeLeft = getConnectTimeout().toMillis();
while (!connected && timeLeft > 0) {
Thread.sleep(50); // NOSONAR Magic #
connected = socketChannel.finishConnect();
timeLeft -= 50; // NOSONAR Magic #
}
if (!connected) {
throw new IOException("Not connected after connectTimeout");
}
}

/**
* When set to true, connections created by this factory attempt
* to use direct buffers where possible.
Expand Down
Expand Up @@ -142,7 +142,12 @@ public void testWriteTimeout() throws Exception {
assertThat(latch.await(10000, TimeUnit.MILLISECONDS)).isTrue();
TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost",
serverSocket.get().getLocalPort());
factory.setApplicationEventPublisher(nullPublisher);
AtomicReference<String> connectionId = new AtomicReference<>();
factory.setApplicationEventPublisher(event -> {
if (event instanceof TcpConnectionOpenEvent) {
connectionId.set(((TcpConnectionOpenEvent) event).getConnectionId());
}
});
factory.setSoTimeout(100);
factory.start();
try {
Expand All @@ -157,6 +162,7 @@ public void testWriteTimeout() throws Exception {
done.countDown();
factory.stop();
serverSocket.get().close();
assertThat(connectionId.get()).startsWith("localhost");
}

@Test
Expand Down

0 comments on commit 7ec1f5c

Please sign in to comment.