Skip to content

Commit

Permalink
Fix memory leak of HTTP server on bind failure (#2844)
Browse files Browse the repository at this point in the history
Fixes #2843 by closing channel on bind (and other) exception
  • Loading branch information
SgtSilvio committed Jun 29, 2023
1 parent c48f6a1 commit 625633e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -221,11 +221,10 @@ static void setChannelOptions(Channel channel, Map<ChannelOption<?>, ?> options,
}
}

@SuppressWarnings("FutureReturnValueIgnored")
static void doConnect(
List<SocketAddress> addresses,
@Nullable Supplier<? extends SocketAddress> bindAddress,
ChannelPromise connectPromise,
MonoChannelPromise connectPromise,
int index) {
Channel channel = connectPromise.channel();
channel.eventLoop().execute(() -> {
Expand All @@ -249,9 +248,6 @@ static void doConnect(
connectPromise.setSuccess();
}
else {
// "FutureReturnValueIgnored" this is deliberate
channel.close();

Throwable cause = future.cause();
if (log.isDebugEnabled()) {
log.debug(format(channel, "Connect attempt to [" + remoteAddress + "] failed."), cause);
Expand Down Expand Up @@ -295,19 +291,7 @@ static Mono<Channel> doInitAndRegister(
}

MonoChannelPromise monoChannelPromise = new MonoChannelPromise(channel);

channel.unsafe().register(eventLoop, monoChannelPromise);
Throwable cause = monoChannelPromise.cause();
if (cause != null) {
if (channel.isRegistered()) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
channel.unsafe().closeForcibly();
}
}

return monoChannelPromise;
}

Expand Down Expand Up @@ -389,8 +373,6 @@ static Mono<Channel> doResolveAndConnect(Channel channel, TransportConfig config
MonoChannelPromise monoChannelPromise = new MonoChannelPromise(channel);
resolveFuture.addListener((FutureListener<List<SocketAddress>>) future -> {
if (future.cause() != null) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
monoChannelPromise.tryFailure(future.cause());
}
else {
Expand Down Expand Up @@ -581,8 +563,16 @@ public ChannelPromise syncUninterruptibly() {
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public boolean tryFailure(Throwable cause) {
if (RESULT_UPDATER.compareAndSet(this, null, cause)) {
if (channel.isRegistered()) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
channel.unsafe().closeForcibly();
}
if (actual != null) {
actual.onError(cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.transport;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import org.junit.jupiter.api.Test;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpClientConfig;

import java.net.InetSocketAddress;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class TransportConnectorTest {

@Test
@SuppressWarnings("FutureReturnValueIgnored")
void bind_whenBindException_thenChannelIsUnregistered() {
final TcpClientConfig transportConfig = TcpClient.newConnection().configuration();
final Channel channel1 = TransportConnector.bind(
transportConfig,
new RecordingChannelInitializer(),
new InetSocketAddress("localhost", 0),
false).block();
final RecordingChannelInitializer channelInitializer = new RecordingChannelInitializer();
assertThatThrownBy(() -> TransportConnector.bind(
transportConfig,
channelInitializer,
new InetSocketAddress("localhost", ((InetSocketAddress) channel1.localAddress()).getPort()),
false).block());
final Channel channel2 = channelInitializer.channel;
assertThat(channel1.isRegistered()).isTrue();
assertThat(channel2.isRegistered()).isFalse();
channel1.close();
}

private static class RecordingChannelInitializer extends ChannelInitializer<Channel> {
Channel channel;
@Override
protected void initChannel(final Channel ch) {
channel = ch;
}
}
}

0 comments on commit 625633e

Please sign in to comment.