Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpMessagingTemplate.convertAndSend causes hanging thread #26464

Closed
hurahurat opened this issue Jan 28, 2021 · 44 comments
Closed

SimpMessagingTemplate.convertAndSend causes hanging thread #26464

hurahurat opened this issue Jan 28, 2021 · 44 comments
Labels
for: external-project Needs a fix in external project in: messaging Issues in messaging modules (jms, messaging)

Comments

@hurahurat
Copy link

hurahurat commented Jan 28, 2021

Affects: 5.3.3.RELEASE

I'm using spring boot 2.4.2 with RabbitMQ as message broker and spring-boot-starter-jetty.
I got an issue on my production server with a jetty thread hangs so long time(almost 2 days) with the thread dump information as below

"qtp682812632-42" Id=0x2a WAITING on java.util.concurrent.CompletableFuture$Signaller@1601acdf
          at java.base@11.0.9.1/jdk.internal.misc.Unsafe.park(Native Method)
          -  waiting on java.util.concurrent.CompletableFuture$Signaller@1601acdf
          at java.base@11.0.9.1/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
          at java.base@11.0.9.1/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
          at java.base@11.0.9.1/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
          at java.base@11.0.9.1/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
          at java.base@11.0.9.1/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1998)
          at org.springframework.util.concurrent.CompletableToListenableFutureAdapter.get(CompletableToListenableFutureAdapter.java:99)
          at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemSessionConnectionHandler.forward(StompBrokerRelayMessageHandler.java:1081)
          at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:599)
          at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:262)
          at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:144)
          at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100)
          at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139)
          at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125)
          at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
          at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
          at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
          at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
          at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151)
          at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:129)
          at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:122)

For sending messages, I just using this code:

response.getMessageList().forEach(m -> messageTemplate.convertAndSend(m.getTopicUrl(), m.getMessageData()));
response.setMessageList(null); //Remove message list
return response;

Any help would be appreciated.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Jan 28, 2021
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Jan 28, 2021

I can't tell what's going on but the Future it's waiting on should be from the send through the ReactorNettyTcpConnection. It sounds like the send through Reactor Netty TcpClient perhaps doesn't complete. Not sure why though.

@violetagg any idea what this might mean, i.e. why the Mono<Void> from NettyOutbound#send might not complete?

@rstoyanchev rstoyanchev added status: waiting-for-feedback We need additional information before we can continue in: messaging Issues in messaging modules (jms, messaging) labels Jan 28, 2021
@violetagg
Copy link
Member

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jan 28, 2021
@rstoyanchev rstoyanchev added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Jan 28, 2021
@hurahurat
Copy link
Author

Unfortunately this time I turn off all websocket logs so I cannot provide any helpful logs about this problem.
But I'm trying to reproducing it and will give some useful information here.
Thank you for your time.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jan 29, 2021
@rstoyanchev rstoyanchev added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Jan 29, 2021
@spring-projects-issues
Copy link
Collaborator

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

@spring-projects-issues spring-projects-issues added the status: feedback-reminder We've sent a reminder that we need additional information before we can continue label Feb 5, 2021
@spring-projects-issues
Copy link
Collaborator

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.

@spring-projects-issues spring-projects-issues removed status: waiting-for-feedback We need additional information before we can continue status: feedback-reminder We've sent a reminder that we need additional information before we can continue status: waiting-for-triage An issue we've not yet triaged or decided on labels Feb 12, 2021
@boginw
Copy link

boginw commented May 8, 2021

We're using Spring Boot 2.4.5 with ActiveMQ as a message broker and we're experiencing similar issues. The fault happens at random for us, but running a test with @RepeatedTest(1000) is a pretty sure-fire way to trigger it. I've set the test to timeout after 20 seconds, and this is the relevant log:

2021-05-08 23:56:20.612 DEBUG 76770 --- [    Test worker] o.s.m.s.b.SimpleBrokerMessageHandler     : Processing MESSAGE destination=/topic/car/57/topic/car session=null payload={"type":"CarDTO","data":{"id":57,"state":"READY","comment":"car Comment","maximu...(truncated)
2021-05-08 23:56:20.612 DEBUG 76770 --- [    Test worker] o.s.m.s.s.StompBrokerRelayMessageHandler : Forwarding SEND /topic/car/57/topic/car session=_system_ application/json payload={"type":"CarDTO","data":{"id":57,"state":"READY","comment":"car Comment","maximu...(truncated)
2021-05-08 23:56:20.612 TRACE 76770 --- [    Test worker] o.s.messaging.simp.stomp.StompEncoder    : Encoding STOMP SEND, headers={destination=[/topic/car/57/topic/car], content-type=[application/json]}
2021-05-08 23:56:21.114 DEBUG 76770 --- [MessageBroker-1] o.s.w.s.s.t.h.DefaultSockJsService       : Closed 19 sessions: [3d453fbe54ed42feb7b8bbe6dd9e5dd4, 943715e8b6d041af9c3bc9b1d60e4f1a, 2e0e7c981cdb43bdade1e93cebaaf4f8, 35b3536656494b3db9f7b9ea4f4afa32, faadb8e5b4454a8383bac55593562eeb, 7c9e323f8e8b44998e7132bf9cc2fcd0, 26ba6026efea42828e3bbd5069a3bd0b, 12a54a195ca149dd9a41c74dffdc6e47, 03a2474c89ad43338163eda9a8f3c81e, 049e384493e04d5ba6a0cb372726e8fe, 2fefdd31025d4aab825a6daa9e5cc218, 33fb7ee0947e47018ef79063716e8678, 293b186593d84e4b9e425535e09df739, 28a4ed15d1bf4371926ce9d9ef1ff87c, 31bc4e710007466e97074f169fd8f39a, c6f62a3be4634eafaee2a369a7b2c4db, d511ef396ace4bcfb966feb3b4b6ab38, 4c3f4b2319844d879567db5df3eb64f6, 72204f8939aa4439a982869ed043e93c]
2021-05-08 23:56:21.592 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510981592; nextExpiration=1620510981317; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:22.592 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510982592; nextExpiration=1620510982592; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:23.594 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510983593; nextExpiration=1620510983592; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:24.595 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510984595; nextExpiration=1620510984593; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:25.596 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510985596; nextExpiration=1620510985595; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:26.114 DEBUG 76770 --- [MessageBroker-1] o.s.w.s.s.t.h.DefaultSockJsService       : Closed 37 sessions: [3f0727d4b1084730adde87071f2d685c, b8ad22b04f674c9d826f446da7d5fae5, 14d170f089ba4d059933c0159264b14e, a93b23d8d74a43878d0d956b36c1ef78, 7dac9256e6e14827a5727089987648c9, 2198aa4d1c7d496cb8f4f01b2f92bfd8, 300f5076e9134c3f84bffb4483104ae0, 596a0ba91b044517948630d212dbfae1, 2aa91eef14ab4df494a3efa8d1b9ab53, d392540bffb142b89221df3b84587c2b, dc24ff5b30d54858adc47c45a2e31b8d, 69610f88d03f47448bc2c70057de73de, f542886479d743b6862a20c1980343f6, 6c29c56d9d924159b80cf2612e601a0d, 508ce092d520434a9e7ae73d6db438a3, 4dbd63771ae6407b9683ddef8c57b1e0, 2f366039fa91472ebb6016fdaa6e95ab, cdbcddb22691471ba027548ce3e1113b, 06e7b116ae5c435b99d75549ea3e6a43, cc944545fc574310b229bc82118a6f3f, 1f77418f01454e2c87056cbe4498ee3e, 13bab02e317f4cf586d9972bc69cd246, 6962f28505a941d9bb43f10b0c6e96d7, 30501e7d344d4fd68cb6b52250e987f1, acc4896a94b649ed94031951314cfd91, 48150651b88b45eeba5e2e77504e5245, b16be0f53fe042a0b23df4542f8ccfda, b865c3782a0c48c0bca33c2cce9647c3, f7db3adfe5f34baa8968f69cc9e04452, f85a910a4e914cbc8155e89e15f099ea, c7ce08b89d4142b793a9e16e176579f7, 4522a33b49234e0eb709113a656db4e8, 5371e1e9c6e345b8973497c6f1c72000, d604505feeec45d79fda44e7f31a9ec0, e25f67e692a4434eb1ccb5645fb1c41a, 0b774c916eac442e9fc11fed8b698232, 8262bd03c44547eeb89ef046d679cfed]
2021-05-08 23:56:26.596 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510986596; nextExpiration=1620510986596; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:27.598 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510987597; nextExpiration=1620510987596; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:28.598 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510988598; nextExpiration=1620510988597; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:29.599 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510989599; nextExpiration=1620510989598; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:30.600 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510990600; nextExpiration=1620510990599; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:30.623 TRACE 76770 --- [ient-loop-nio-2] o.s.messaging.simp.stomp.StompEncoder    : Encoding heartbeat
2021-05-08 23:56:30.676 TRACE 76770 --- [ient-loop-nio-2] o.s.messaging.simp.stomp.StompDecoder    : Decoded heart-beat
2021-05-08 23:56:30.676 TRACE 76770 --- [ent-scheduler-3] o.s.m.s.s.StompBrokerRelayMessageHandler : Received heart-beat in session _system_
2021-05-08 23:56:31.601 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510991601; nextExpiration=1620510991600; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:32.603 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510992603; nextExpiration=1620510992601; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:32.747 DEBUG 76770 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
2021-05-08 23:56:32.747 DEBUG 76770 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Fill pool skipped, pool is at sufficient level.
2021-05-08 23:56:33.604 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510993604; nextExpiration=1620510993603; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:34.604 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510994604; nextExpiration=1620510994604; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:35.606 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510995606; nextExpiration=1620510995604; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:36.607 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510996607; nextExpiration=1620510996606; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:37.608 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510997608; nextExpiration=1620510997607; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:38.608 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510998608; nextExpiration=1620510998608; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:39.610 DEBUG 76770 --- [-1-ClientPoller] org.apache.tomcat.util.net.NioEndpoint   : timeout completed: keys processed=2; now=1620510999610; nextExpiration=1620510999608; keyCount=0; hasEvents=false; eval=false
2021-05-08 23:56:40.562 TRACE 76770 --- [    Test worker] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.orm.jpa.EntityManagerHolder@68389336] for key [org.hibernate.internal.SessionFactoryImpl@4e8862e5] from thread [Test worker]
2021-05-08 23:56:40.562 DEBUG 76770 --- [    Test worker] o.j.s.OpenEntityManagerInViewInterceptor : Closing JPA EntityManager in OpenEntityManagerInViewInterceptor
2021-05-08 23:56:40.562 TRACE 76770 --- [    Test worker] org.hibernate.internal.SessionImpl       : Closing session [d6c54f09-b513-435e-b3d4-7eb24f1b6d03]
2021-05-08 23:56:40.562 TRACE 76770 --- [    Test worker] o.h.e.jdbc.internal.JdbcCoordinatorImpl  : Closing JDBC container [org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl@50972793]
2021-05-08 23:56:40.562 TRACE 76770 --- [    Test worker] o.h.r.j.i.ResourceRegistryStandardImpl   : Releasing JDBC resources
2021-05-08 23:56:40.562 TRACE 76770 --- [    Test worker] o.h.r.j.i.LogicalConnectionManagedImpl   : Closing logical connection
2021-05-08 23:56:40.562 TRACE 76770 --- [    Test worker] o.h.r.j.i.LogicalConnectionManagedImpl   : Logical connection closed
2021-05-08 23:56:40.568 TRACE 76770 --- [    Test worker] o.s.t.web.servlet.TestDispatcherServlet  : Failed to complete request

org.springframework.messaging.MessageDeliveryException: nested exception is java.lang.InterruptedException 
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemSessionConnectionHandler.forward(StompBrokerRelayMessageHandler.java:1086) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:599) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:281) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:144) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:129) ~[spring-messaging-5.3.6.jar:5.3.6]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:122) ~[spring-messaging-5.3.6.jar:5.3.6]
	at ctrl.application.websocket.publisher.WebSocketPublisher.notifyCarWithId(WebSocketPublisher.java:31) ~[main/:na]

...

Caused by: java.lang.InterruptedException: null
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2069) ~[na:na]
	at org.springframework.util.concurrent.CompletableToListenableFutureAdapter.get(CompletableToListenableFutureAdapter.java:99) ~[spring-core-5.3.6.jar:5.3.6]
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemSessionConnectionHandler.forward(StompBrokerRelayMessageHandler.java:1081) ~[spring-messaging-5.3.6.jar:5.3.6]
	... 220 common frames omitted

@violetagg what do you mean by wire logging? Are we talking Wireshark or?

@violetagg
Copy link
Member

@boginw
Copy link

boginw commented May 16, 2021

@violetagg I'm unsure how to enable the wire-logging you mentioned in a Spring Boot application. I have tried using what you mentioned in this other issue but to no avail. The project uses org.springframework.boot:spring-boot-starter-webflux and has the following component:

@Component
public class NettyWebServerCustomizer
    implements WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {

    @Override
    public void customize(NettyReactiveWebServerFactory factory) {
        factory.addServerCustomizers(httpServer -> httpServer.wiretap(
            "cool-logger",
            LogLevel.DEBUG,
            AdvancedByteBufFormat.TEXTUAL
        ));
    }
}

and in my properties file, I have

logging.level.root=DEBUG
logging.level.reactor.netty=trace
logging.level.cool-logger=DEBUG

But my logs do not contain any matches for cool-logger.

@violetagg
Copy link
Member

@boginw If this is not working please open an issue in Reactor Netty with a reproducible example. Thanks.

@alekol
Copy link

alekol commented May 19, 2021

same exact problem and thread dump in here. did anyone figure out any clues on why this is happening?

the only thing suspicious in my case is that the thread that's trying to write on the socket is a rabbitmq listener container thread - so it's not one of my app's threads, but a rabbit listener trying to dump smth back into rabbit (via stomp)

here's the thread dump:

jdk.internal.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.park(java.lang.Object) (line: 194)
java.util.concurrent.CompletableFuture$Signaller.block() (line: 1796)
java.util.concurrent.ForkJoinPool.managedBlock(java.util.concurrent.ForkJoinPool$ManagedBlocker) (line: 3128)
java.util.concurrent.CompletableFuture.waitingGet(boolean) (line: 1823)
java.util.concurrent.CompletableFuture.get() (line: 1998)
org.springframework.util.concurrent.CompletableToListenableFutureAdapter.get() (line: 99)
org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemSessionConnectionHandler.forward(org.springframework.messaging.Message, org.springframework.messaging.simp.stomp.StompHeaderAccessor) (line: 1081)
org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(org.springframework.messaging.Message) (line: 599)
org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(org.springframework.messaging.Message) (line: 262)
org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run() (line: 144)
org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(org.springframework.messaging.Message, long) (line: 100)
org.springframework.messaging.support.AbstractMessageChannel.send(org.springframework.messaging.Message, long) (line: 139)
org.springframework.messaging.support.AbstractMessageChannel.send(org.springframework.messaging.Message) (line: 125)
org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(org.springframework.messaging.Message) (line: 187)
org.springframework.messaging.simp.SimpMessagingTemplate.doSend(java.lang.String, org.springframework.messaging.Message) (line: 162)
org.springframework.messaging.simp.SimpMessagingTemplate.doSend(java.lang.Object, org.springframework.messaging.Message) (line: 48)
org.springframework.messaging.core.AbstractMessageSendingTemplate.send(java.lang.Object, org.springframework.messaging.Message) (line: 109)
org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(java.lang.Object, java.lang.Object, java.util.Map, org.springframework.messaging.core.MessagePostProcessor) (line: 151)
org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(java.lang.Object, java.lang.Object, java.util.Map) (line: 129)
org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(java.lang.Object, java.lang.Object) (line: 122)
com.halodx.itl.connectivity.connectors.sockjs.SockJSServer.sendMessageOut(com.halodx.itl.connectivity.model.ControlMessage) (line: 98)
connectivity.connectors.sockjs.SockJSServer.sendMessage(com.halodx.itl.connectivity.model.ControlMessage,

@alekol
Copy link

alekol commented May 19, 2021

Just managed to get the same exact problem when a diff thread writes on the wire, so it's not rabbitmq related.

Is SimpMessagingTemplate (and all its dependencies down to netty) thread safe? WebsocketClient is certainly not, so if smth in between is not sync-ing, this could be a possible reason for what we're seeing here..

here's the second thread, same exact behavior (hanging on a future)

java.util.concurrent.locks.LockSupport.park(java.lang.Object) (line: 194)
java.util.concurrent.CompletableFuture$Signaller.block() (line: 1796)
java.util.concurrent.ForkJoinPool.managedBlock(java.util.concurrent.ForkJoinPool$ManagedBlocker) (line: 3128)
java.util.concurrent.CompletableFuture.waitingGet(boolean) (line: 1823)
java.util.concurrent.CompletableFuture.get() (line: 1998)
org.springframework.util.concurrent.CompletableToListenableFutureAdapter.get() (line: 99)
org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemSessionConnectionHandler.forward(org.springframework.messaging.Message, org.springframework.messaging.simp.stomp.StompHeaderAccessor) (line: 1081)
org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(org.springframework.messaging.Message) (line: 599)
org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(org.springframework.messaging.Message) (line: 262)
org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run() (line: 144)
org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(org.springframework.messaging.Message, long) (line: 100)
org.springframework.messaging.support.AbstractMessageChannel.send(org.springframework.messaging.Message, long) (line: 139)
org.springframework.messaging.support.AbstractMessageChannel.send(org.springframework.messaging.Message) (line: 125)
org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(org.springframework.messaging.Message) (line: 187)
org.springframework.messaging.simp.SimpMessagingTemplate.doSend(java.lang.String, org.springframework.messaging.Message) (line: 162)
org.springframework.messaging.simp.SimpMessagingTemplate.doSend(java.lang.Object, org.springframework.messaging.Message) (line: 48)
org.springframework.messaging.core.AbstractMessageSendingTemplate.send(java.lang.Object, org.springframework.messaging.Message) (line: 109)
org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(java.lang.Object, java.lang.Object, java.util.Map, org.springframework.messaging.core.MessagePostProcessor) (line: 151)
org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(java.lang.Object, java.lang.Object, java.util.Map) (line: 129)
org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(java.lang.Object, java.lang.Object) (line: 122)
connectivity.connectors.sockjs.SockJSServer.sendMessageOut(com.halodx.itl.connectivity.model.ControlMessage) (line: 98)

@alekol
Copy link

alekol commented May 19, 2021

The issue has nothing to do with the thread safety of the SimpMessagingTemplate.. I've synced all writing to it through a dedicated thread with the only difference being - now this thread hangs, waiting on the netty future that never resolves..

Reproduces easily after a few tries with intense & simultaneous bi-directional traffic on the wire (srv->client and client->srv)

@rstoyanchev
Copy link
Contributor

@alekol, see #26464 (comment).

@alekol
Copy link

alekol commented May 19, 2021

yeah.. was trying for an hour to get the netty logs in place for a spring boot app with log4j2, so all the examples where netty client/server is manually instantiated are of no help...

at this point it's probably easier to build netty from source and debug :/

I'll try to reach out to their community. could be a known problem. it's just too easy to repro..

@alekol
Copy link

alekol commented May 31, 2021

It turns out that SimpMessagingTemplate already supports sending with a timeout (SimpMessagingTemplate.setSendTimeout()), but it seems that this timeout is not considered within the implementation of StompBrokerRelayMessageHandler:SystemSessionConnectionHandler.forward()

With this in mind, it seems it'd be a fair assumption to expect this timeout to be respected in SystemSessionConnectionHandler::forward()

Would you guys consider a fix? If needed, I could draft the merge request..

@codergmc
Copy link

codergmc commented Jun 1, 2021

@alekol I prefer ChannelFuture#get() hava a greater time delay because you send many large messages and occupy network bandwidth. ChannelFuture#get() return only when channel hava already written message in netty event loop thread.
I can not reproduce this problem . Can you provide a complete example ?

@alekol
Copy link

alekol commented Jun 1, 2021

I cannot provide an isolated repro of this and I can't share the code of my project (IP and all) :/

What's causing the app-level deadlock is the unbound future.get() which is not respecting the value set via template.setSendTimeout(). So a greater time delay is not going to solve anything for anyone.

I realize the issue is hard to reproduce and is probably beyond spring-messaging per se, but that doesn't make it any less real, nor the need to address the issue with unbound future.get() any less working in this case

@boginw
Copy link

boginw commented Jun 1, 2021

Just for some context, we have a failure rate of 0.5%. But given that we have around 2,500 tests, then we are sure that at least one test fails almost every time we run them.

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Jun 2, 2021

@alekol thanks for the wire logs. A couple of things I'm not able to make sense of.

You wrote the last good write is at 09:27:01,287 and the last good read at 09:27:03,532. First, those are in reverse, read and write respectively, but more importantly the READ is followed by READ_COMPLETE, the WRITE is followed by FLUSH, and there are many subsequent reads and writes on the same connection id 0xc2c6864e immediately after. In other words the wire logs don't show any interruption for that connection. Are you sure you correctly correlating the hanging send to the log messages?

You wrote that once the issue happens, the app keeps receiving messages, but it can't write due to the unresolved future and blocked thread. If messages are sent through the SimpMessagingTemplate they should go through the brokerChannel which should have at least as many threads as the number of cores, and while hanging can quickly exhaust the pool, it still shouldn't prevent the whole app from sending if only 1 thread hangs.

One suggestion for something to try. I'm wondering if the issue could be related to #25561 where the I/O works fine but the Future isn't getting completed correctly. One way to try is to to check with Spring Framework 5.2.15 (Boot 2.3.11 I think). Alternatively, you could undo 7758ba3 locally, i.e. by forking MonoToListenableFutureAdapter to match how it was before the change, to see if it makes a difference.

As for the suggestion for sendTimeout, I think that's something we can consider but I'd like to try to find out more about the root cause.

@rstoyanchev rstoyanchev added the status: waiting-for-feedback We need additional information before we can continue label Jun 2, 2021
@xor22h
Copy link

xor22h commented Jun 2, 2021

@rstoyanchev look at my log please. Especially on last successful write of 83 bytes at 2021-05-24 08:23:56.236. Yes, it has FLUSH after it, and same connection still has writes in future, but no longer for this 83bytes message. And that thread is no longer used. It's stuck forever.

In a longer run, it's possible to get all available threads hanging. (no writes performed).

With SpringBoot 2.3.4 i had no issue with same code. After upgrading to 2.4.x/2.5.x it started to get locked.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jun 2, 2021
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Jun 2, 2021

@xor22h I did look at it but it doesn't have wire logging (see the log by @alekol) and I don't see anything unusual there. Just a sequence of writes and flushes. What other log messages are missing for the 83 bytes message?

With SpringBoot 2.3.4 i had no issue with same code. After upgrading to 2.4.x/2.5.x it started to get locked.

This is good to know about 2.3.4 vs 2.4/2.5. Would you be able to try a more isolated change along the lines of what I suggested:

you could undo 7758ba3 locally, i.e. by forking MonoToListenableFutureAdapter to match how it was before the change, to see if it makes a difference.

@alekol
Copy link

alekol commented Jun 2, 2021

@rstoyanchev Thanks for looking into this

You are correct - the nio thread is not blocked and it keeps on sending/receiving keep-alives and other things. however, the writing thread is blocked because the future never gets completed (get() never returns), so the app outbound traffic gets ultimately stalled in my case (all our wire-facing writes happen in a single thread)

For the suggestion to try out - I'll give it a whack tomorrow and post here

@rstoyanchev
Copy link
Contributor

For the suggestion to try out - I'll give it a whack tomorrow and post here

Okay, if I/O keeps working, then an issue in the Mono to Future adapter code could indeed help to explain the problem.

@xor22h
Copy link

xor22h commented Jun 2, 2021

@rstoyanchev I've missing log lines which contained [driver_location] getAll => ... These lines should appear every 2 seconds. But at somepoint they freeze, as all threads for @Scheduled get stuck.

Minimum product where i can still see it reproducing: https://we.tl/t-YNoeojTPZy Sometimes it happens very soon, sometimes it might take 10min for it to happen.

@codergmc
Copy link

codergmc commented Jun 3, 2021

@xor22h I change @Scheduled with the shorter time interval and run your project. It is hard to reproduct this problem.
Use the latest verion of rabbitmq with default configuration and openjdk version "11.0.11"

@hurahurat
Copy link
Author

public CompletableToListenableFutureAdapter(CompletableFuture<T> completableFuture) {
		this.completableFuture = completableFuture;
		this.completableFuture.whenComplete((result, ex) -> {
			if (ex != null) {
				this.callbacks.failure(ex);
			}
			else {
				this.callbacks.success(result);
			}
		});
	}

In class CompletableToListenableFutureAdapter, whenComplete function will never been run if there is exception in mono.toFuture() of Mono class.

	@Override
	@SuppressWarnings("unchecked")
	public final void subscribe(Subscriber<? super T> actual) {
		CorePublisher publisher = Operators.onLastAssembly(this);
		CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

		try {
			if (publisher instanceof OptimizableOperator) {
				OptimizableOperator operator = (OptimizableOperator) publisher;
				while (true) {
					subscriber = operator.subscribeOrReturn(subscriber);
					if (subscriber == null) {
						// null means "I will subscribe myself", returning...
						return;
					}

					OptimizableOperator newSource = operator.nextOptimizableSource();
					if (newSource == null) {
						publisher = operator.source();
						break;
					}
					operator = newSource;
				}
			}

			publisher.subscribe(subscriber);
		}
		catch (Throwable e) {
			Operators.reportThrowInSubscribe(subscriber, e);
			return;
		}
	}

In onError function of MonoToCompletableFuture class, I think ref.getAndSet(null) is null so completeExceptionally is not fired which cause future.get() never been resolved.

	@Override
	public void onError(Throwable t) {
		if (ref.getAndSet(null) != null) {
			completeExceptionally(t);
		}
	}

@alekol
Copy link

alekol commented Jun 4, 2021

nice find @hurahurat ! I'll try and see if this is indeed what's happening :)

@hurahurat
Copy link
Author

I have created a simple project which can reproduce this problem easily. I will upload it here tomorrow because now I'm trying to find the root cause.

@hurahurat
Copy link
Author

I have put some breakpoints to see if it has happened as I thought but it's not. I don't want to fork spring-websocket and reactor-netty so it's better to wait for you guys. I have confirmed that it works normally with springboot before 2.4.x version.
Please take a look at the attach file below. When visiting the "localhost:9999/test" it will take about 7-10 minues to make thread be hanging on my machine. Maybe it can take longer depends on machine spec.
websocket-demo.zip

@jbakoc1
Copy link

jbakoc1 commented Jun 17, 2021

I confirm that in my machine the messages stop after ~1sec for spring boot 2.5.1
When I change to 2.3.12.RELEASE everything is ok.

@hurahurat
Copy link
Author

I have reverted the change 7758ba3 in spring-boot version 2.5.1(spring version 5.3.8) and found that it worked fine although the thread was in sleeping mode occasionally during the test.
So I think there is a concurrent problem in CompletableToListenableFutureAdapter class under the load.

@hurahurat
Copy link
Author

I added some logs to check and found that this problem appeared when send() method in ReactorNettyTcpConnection class is called at the same time.
You can find in the image below, the http-nio-9999-exe is hung without the "forward successfully" message log that I have added.
While the http-nio-9999-exe thread was waiting for result in CompletableFuture.get(), thetcp-client-loop-nio-2 thread sent an empty payload message (by scheduler.schedule(() -> this.connectionHandler.afterConnected(connection) maybe to keep tcp connection alive)).
But the problem is the tcp-client-loop-nio-2 thread was also the thread who completed the CompletableFuture of http-nio-9999-exe thread.
At the same time, it could complete only it's own CompletableFuture so the http-nio-9999-exe thread will waiting forever.

websocket2222

@hurahurat
Copy link
Author

hurahurat commented Jun 18, 2021

After adding more logs in the project reactor-core, I think that I have found the root cause of this issue. It is about the order of execution. In MonoToCompletableFuture class, the onComplete event has been fired before the onSubscribe event as you can see in the below log lines.

2021-06-19 07:08:49.757  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.t.r.ReactorNettyTcpConnection      : ReactorNettyTcpConnection send() in thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.757  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe prevSub value is null and new subscription is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@2abeaabc
2021-06-19 07:08:49.757  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe Operators.validate is true in Thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.758  INFO 28220 --- [http-nio-9999-exec-1] reactor.core.publisher.Mono              : ======Oops It will subscribe itself
2021-06-19 07:08:49.758  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete prevSub value is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@2abeaabc
2021-06-19 07:08:49.758  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete ref.getAndSet(null) != null is true in Thread Thread[tcp-client-loop-nio-2,5,main]
2021-06-19 07:08:49.758  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.s.s.StompBrokerRelayMessageHandler : SystemSessionConnectionHandler forwarding
2021-06-19 07:08:49.758  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.s.s.StompBrokerRelayMessageHandler : SystemSessionConnectionHandler forward successfully
2021-06-19 07:08:49.758  INFO 28220 --- [http-nio-9999-exec-1] c.t.websocket.controller.TestController  : Sending data in servertime: 2021-06-19T07:08:49.757105700
2021-06-19 07:08:49.758  INFO 28220 --- [http-nio-9999-exec-1] c.t.websocket.controller.TestController  : #########################################

2021-06-19 07:08:49.758  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.t.r.ReactorNettyTcpConnection      : ReactorNettyTcpConnection send() in thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe prevSub value is null and new subscription is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@23d2f242
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe Operators.validate is true in Thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onComplete prevSub value is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@23d2f242
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onComplete ref.getAndSet(null) != null is true in Thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] reactor.core.publisher.Mono              : ======Oops It will subscribe itself
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.s.s.StompBrokerRelayMessageHandler : SystemSessionConnectionHandler forwarding
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.s.s.StompBrokerRelayMessageHandler : SystemSessionConnectionHandler forward successfully
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] c.t.websocket.controller.TestController  : Sending data in servertime: 2021-06-19T07:08:49.758105300
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] c.t.websocket.controller.TestController  : #########################################

2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.t.r.ReactorNettyTcpConnection      : ReactorNettyTcpConnection send() in thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe prevSub value is null and new subscription is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@fc8721b
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe Operators.validate is true in Thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] reactor.core.publisher.Mono              : ======Oops It will subscribe itself
2021-06-19 07:08:49.759  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.s.s.StompBrokerRelayMessageHandler : SystemSessionConnectionHandler forwarding
2021-06-19 07:08:49.759  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete prevSub value is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@fc8721b
2021-06-19 07:08:49.760  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete ref.getAndSet(null) != null is true in Thread Thread[tcp-client-loop-nio-2,5,main]
2021-06-19 07:08:49.760  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.s.s.StompBrokerRelayMessageHandler : SystemSessionConnectionHandler forward successfully
2021-06-19 07:08:49.760  INFO 28220 --- [http-nio-9999-exec-1] c.t.websocket.controller.TestController  : Sending data in servertime: 2021-06-19T07:08:49.759106400
2021-06-19 07:08:49.760  INFO 28220 --- [http-nio-9999-exec-1] c.t.websocket.controller.TestController  : #########################################

2021-06-19 07:08:49.760  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.t.r.ReactorNettyTcpConnection      : ReactorNettyTcpConnection send() in thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.761  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete prevSub value is null
2021-06-19 07:08:49.761  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete ref.getAndSet(null) != null is false in Thread Thread[tcp-client-loop-nio-2,5,main]
2021-06-19 07:08:49.761  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe prevSub value is null and new subscription is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@1e19d341
2021-06-19 07:08:49.761  INFO 28220 --- [http-nio-9999-exec-1] r.c.publisher.MonoToCompletableFuture    : onSubscribe Operators.validate is true in Thread Thread[http-nio-9999-exec-1,5,main]
2021-06-19 07:08:49.761  INFO 28220 --- [http-nio-9999-exec-1] reactor.core.publisher.Mono              : ======Oops It will subscribe itself
2021-06-19 07:08:49.761  INFO 28220 --- [http-nio-9999-exec-1] o.s.m.s.s.StompBrokerRelayMessageHandler : SystemSessionConnectionHandler forwarding
2021-06-19 07:08:59.764  INFO 28220 --- [tcp-client-loop-nio-2] o.s.m.t.r.ReactorNettyTcpConnection      : ReactorNettyTcpConnection send() in thread Thread[tcp-client-loop-nio-2,5,main]
2021-06-19 07:08:59.765  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onSubscribe prevSub value is null and new subscription is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@2cc4c08
2021-06-19 07:08:59.765  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onSubscribe Operators.validate is true in Thread Thread[tcp-client-loop-nio-2,5,main]
2021-06-19 07:08:59.765  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete prevSub value is reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber@2cc4c08
2021-06-19 07:08:59.765  INFO 28220 --- [tcp-client-loop-nio-2] r.c.publisher.MonoToCompletableFuture    : onComplete ref.getAndSet(null) != null is true in Thread Thread[tcp-client-loop-nio-2,5,main]
2021-06-19 07:08:59.765  INFO 28220 --- [tcp-client-loop-nio-2] reactor.core.publisher.Mono              : ======Oops It will subscribe itself
2021-06-19 07:09:00.448  INFO 28220 --- [RMI TCP Connection(2)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2021-06-19 07:09:00.450  INFO 28220 --- [RMI TCP Connection(2)-127.0.0.1] o.s.m.s.s.StompBrokerRelayMessageHandler : Stopping...
2021-06-19 07:09:00.450  INFO 28220 --- [RMI TCP Connection(2)-127.0.0.1] o.s.m.s.s.StompBrokerRelayMessageHandler : BrokerAvailabilityEvent[available=false, StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientConnect@59bfbd41]]]
2021-06-19 07:09:00.456  INFO 28220 --- [RMI TCP Connection(2)-127.0.0.1] r.c.publisher.MonoToCompletableFuture    : onSubscribe prevSub value is null and new subscription is reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain@2a64d880
2021-06-19 07:09:00.456  INFO 28220 --- [RMI TCP Connection(2)-127.0.0.1] r.c.publisher.MonoToCompletableFuture    : onSubscribe Operators.validate is true in Thread Thread[RMI TCP Connection(2)-127.0.0.1,5,RMI Runtime]

I think it happened due to the fact that CompleteFuture is not lazy as nature, onComplete and onSubscribe events were executed by separate threads and the onSubscribe event might take more time because of message size and encoding step.
If remove the if logic(ref.getAndSet(null) != null) in onComplete(also onCancel as well) I confirm that it works fine. But I'm not sure it could cause some side effects since this file might be used by other applications and libraries.
The full logs file is in the attachment as if you may need it.
spring.log

@rstoyanchev
Copy link
Contributor

@hurahurat, thanks for persisting and finding the root cause! I'm closing this in favor of the Reactor Netty issue.

@rstoyanchev rstoyanchev added for: external-project Needs a fix in external project and removed status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on labels Jun 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: external-project Needs a fix in external project in: messaging Issues in messaging modules (jms, messaging)
Projects
None yet
Development

No branches or pull requests

10 participants