Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactor2StompTcpClient should use a shared EventLoopGroup [SPR-15035] #19601

Closed
spring-issuemaster opened this issue Dec 20, 2016 · 7 comments

Comments

@spring-issuemaster
Copy link
Collaborator

commented Dec 20, 2016

Anton Shukalo opened SPR-15035 and commented

Broker is down. It try reconnect to broker. On each connection attempt create new instance of NettyTcpClient, inside NettyTcpClient runs thread. This thread never stops because it not shutdown on connection error.


Affects: 4.3.4

Attachments:

Issue Links:

  • #18803 Reactor2TcpClient leaks threads on shutdown
@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Dec 20, 2016

Anton Shukalo commented

Cleanup task should shutdown tcpClient

cleanupTask = new Runnable() {
				@Override
				public void run() {
					synchronized (tcpClients) {
						tcpClient.shutdown();                // <-- fix here
						tcpClients.remove(tcpClient);
					}
				}
			};
@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Dec 20, 2016

Rossen Stoyanchev commented

I'm investigating on my end. I don't yet understand what's going on.

For once we are using the Reconnect strategy from Reactor and should not be creating new instances of NettyTcpClient on reconnect? What are you actually seeing as evidence for this? We don't shutdown the "system" TcpClient (for broadcasting messages from the server-side to the broker) which has the reconnect logic until the server shuts down. We just keep trying to reconnect. As for the connection itself, one way or another it should be getting closed after the failures and as a result also cleaned up (by Reactor) -- i.e. nothing special we need to do to clean up besides closing.

As I said I'm investigating but could you share some details on what you're seeing for the memory leak? If using a tool are object counts growing?

Also this looks related #18803 and there were some fixes towards Reactor Net 2.0.9 which I now see was never released. Can you check with reactor-net 2.0.9.BUILD-SNAPSHOT?

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Dec 20, 2016

Rossen Stoyanchev commented

So far I am unable to reproduce the issue. I am using the Spring WebSocket Portfolio sample with 4.3.5 snapshots (there shouldn't be any difference for this vs 4.3.4), Jetty 9.3.14.v20161028, and Yourkit for profiling. When the sample is up and running and I verify it through the browser, I stop RabbitMQ and verify in the logs:

15:28:10 [reactor-tcp-io-4] StompBrokerRelayMessageHandler[DEBUG] - TCP connection to broker closed in session _system_
15:28:10 [reactor-tcp-io-4] StompBrokerRelayMessageHandler[DEBUG] - Cleaning up connection state for session _system_
15:28:10 [reactor-tcp-io-4] StompBrokerRelayMessageHandler[INFO] - BrokerAvailabilityEvent[available=false, StompBrokerRelay[127.0.0.1:61613]]

I wait a while, then start RabbitMQ again and verify in the logs

15:29:30 [reactor-tcp-io-1] StompBrokerRelayMessageHandler[DEBUG] - TCP connection opened in session=_system_
15:29:30 [reactor-tcp-io-1] StompBrokerRelayMessageHandler[DEBUG] - Received CONNECTED heart-beat=[10000, 10000] session=_system_
15:29:30 [reactor-tcp-io-1] StompBrokerRelayMessageHandler[INFO] - "System" session connected.
15:29:30 [reactor-tcp-io-1] StompBrokerRelayMessageHandler[INFO] - BrokerAvailabilityEvent[available=true, StompBrokerRelay[127.0.0.1:61613]]

Periodically in Yourkit I force garbage collection and take memory snapshots. Then I look at "Objects reachable from GC roots via strong references", filter by "reactor", and sort by object count. The highest object count for low-level types (like Signal and Tuple) is 4 while Reactor2TcpClient, NettyTcpClient, HashwheelTimer are at either 1 (for the "system" connection) or 2 when I'm also connected through the browser. Those counts remain stable. I don't see any accumulation.

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Dec 23, 2016

Anton Shukalo commented

I create simple application [^messagingapp.zip]. When i run this app with no broker i see threads and memory leaks. It app use spring-integration @ServiceActivator for reading messages from topic.

After 5 minutes i have more then 50 threads with name "reactor-tcp-io-XX"

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Dec 23, 2016

Rossen Stoyanchev commented

Thanks for the sample. I was trying with the Spring Framework STOMP-over-WebSocket support which uses the Reactor2TcpClient directly in the StompBrokerRelayMessageHandler. You're actually using the Reactor2TcpStompClient which also delegates to Reactor2TcpClient and this is in a Spring Integration project with Spring Integration adding reconnect logic on top.

First for the memory leak. In Reactor 2 a new TcpClient is created per connection with the expectation that a shared EventLoopGroup is managed externally. This is the case with the basic Reactor2TcpClient constructor (host, port, codec) but the Reactor2TcpStompClient doesn't use that in its own basic constructor (host, port) because it needs to customize the Reactor dispatching mechanism. So each time Spring Integration tries to reconnect a new EventLoopGroup is created.

Fortunately Reactor2TcpStompClient has a second constructor. Here is a workaround to use:

@Bean
public Reactor2TcpStompClient stompClient() {
    Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder());
    Reactor2TcpClient<byte[]> tcpClient = new Reactor2TcpClient<byte[]>(getTcpClientFactory(codec));
    Reactor2TcpStompClient stompClient = new Reactor2TcpStompClient(tcpClient);
    // ...
    return stompClient;
}

private <P> TcpClientFactory<Message<P>, Message<P>> getTcpClientFactory(final Codec<Buffer, Message<P>, Message<P>> codec) {

    final EventLoopGroup group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
    final Environment env = new Environment();

    return new TcpClientFactory<Message<P>, Message<P>>() {

        public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
            return spec
                    .env(env)
                    .dispatcher(env.getDispatcher(Environment.WORK_QUEUE))
                    .connect(stompHost, stompPort)
                    .codec(codec)
                    .options(new NettyClientSocketOptions().eventLoopGroup(group));
        }
    };
}

Second the Reactor2TcpClient actually has reconnect logic. There are two connect methods and one takes a ReconnectStrategy. Currently however the Reactor2TcpStompClient does not expose that. I can add an extra method to accept a reconnect strategy and that would eliminate the need for Spring Integration to do reconnect itself.

Artem Bilan would that help?

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Dec 23, 2016

Artem Bilan commented

Anton Shukalo, thank you for sharing such a nasty problem!

Rossen Stoyanchev, thank you for that comprehensive investigation.

Yes, indeed ReconnectStrategy would help for the Reactor2TcpStompClient.

On the other hand, please, advice what do we have for the WebSocketStompClient reconnect management ?

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Jan 17, 2017

Rossen Stoyanchev commented

Anton Shukalo this should be fixed now in the next 4.3.6 snapshot build. I confirmed the fix with your sample but if you're able to try it that'd be great.

Artem Bilan, indeed we would simply be exposing the reconnect feature of the Reactor TCP client. There is nothing built in on the WebSocketStompClient side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.