New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support STOMP receipts with the simple broker [SPR-17315] #21848
Comments
Rossen Stoyanchev commented Starting with /app/greetings creates a potential gap, while starting with /topic/greetings creates a potential overlap. So you have to start receiving the latest first via /topic/greetings, then get the history via /app/greetings, and filter out any overlap from the history. |
spencercw commented Hi Rossen. I understand that, but there doesn't seem to be any way to reliably subscribe to the topic first under the default configuration where the thread pool has more than one thread. I can send the subscriptions in order, but because of the race condition described, they may be processed out of order. In principle I could wait for the first subscription to complete before sending the second subscription, but, as far as I can tell, the simple broker doesn't have any support for acknowledging subscriptions. I can't just wait for the first real message on the topic channel because in practice, if nothing is changing, no messages will be posted to the channel. This would also add an extra round-trip of latency before the first data is rendered so is not great. |
Rossen Stoyanchev commented Okay I understand now. Indeed without a receipt from the topic subscription, the history may be incomplete. The simple broker does not support receipts, which is a STOMP specific header and frame, but it could be done through a I'll turn this into a ticket to support receipts with the simple broker for 5.2. In the mean time you can add the following to your configuration and that will generate RECEIPT frames: @Configuration
static class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private MessageChannel outChannel;
@Autowired
public WebSocketConfig(MessageChannel clientOutboundChannel) {
this.outChannel = clientOutboundChannel;
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ExecutorChannelInterceptor() {
@Override
public void afterMessageHandled(Message<?> inMessage,
MessageChannel inChannel, MessageHandler handler, Exception ex) {
StompHeaderAccessor inAccessor = StompHeaderAccessor.wrap(inMessage);
String receipt = inAccessor.getReceipt();
if (StringUtils.isEmpty(receipt)) {
return;
}
StompHeaderAccessor outAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
outAccessor.setSessionId(inAccessor.getSessionId());
outAccessor.setReceiptId(receipt);
outAccessor.setLeaveMutable(true);
Message<byte[]> outMessage =
MessageBuilder.createMessage(new byte[0], outAccessor.getMessageHeaders());
outChannel.send(outMessage);
}
});
}
}
|
spencercw commented Subscription receipts would certainly be a welcome enhancement, but it doesn't feel like the best solution to this issue because it forces an extra round-trip to the server before anything is rendered (i.e., send subscription, wait for acknowledgement, send request for initial data). On a slow connection this could add a considerable amount of latency. I have spotted another ticket #20087 which sounds like basically the same problem on the outbound channels. The same solution could be used here if that were implemented. |
Rossen Stoyanchev commented Unlike the outbound side where there is only a single subscriber ( Serializing from the client side when needed seems like a more general solution. If the connection is to slow or hard to predict, you could introduce an intentional slowdown in the controller method with a predictable delay: @SubscribeMapping("/greetings")
public CompletableFuture<Integer> init() {
CompletableFuture<Integer> future = new CompletableFuture<>();
scheduler.schedule(() -> future.complete(value.get()), Instant.now().plusMillis(500));
return future;
} |
Rossen Stoyanchev commented Note to self to add extra guidance in the reference docs based on this discussion. |
spencercw commented I'm just pondering this a bit more and it feels like there are probably more issues here. All messages are received on a single thread and are then dispatched to the thread pool for handling where they may be processed in a different order to how they are received. What happens if a connect, subscribe message pair is processed as subscribe, connect. Or subscribe, unsubscribe on the same topic is processed as unsubscribe, subscribe (I could see someone doing this for the initial 'app' subscription on the expectation that the data will be delivered immediately upon subscription, so there's no point keeping it open). It might be safer to process 'control' (connection and subscription related) messages synchronously and dispatch regular 'data' messages to the thread pool. |
Rossen Stoyanchev commented You cannot do anything until connect-connected frames have been exchanged, and you cannot subscribe or send anything further until that's done. That's required to negotiate things like protocol version and heart beat intervals. The "/app" subscriptions only reach a controller method, which replies and the subscription is never stored, and never used again. Theoretically a subscribe-unsubscribe could be processed out of order but sending those at the same time doesn't make much sense either. |
spencercw opened SPR-17315 and commented
I'm trying to set up a project using WebSockets with the 'subscribe and snapshot' pattern (i.e., subscribing to a stream of updates and requesting a snapshot for initialisation).
From what I've been able to gather, the intended way to get the initial snapshot is to use
@SubscribeMapping
. I have put together a simple test class:My client first subscribes to /topic/greetings and then /app/greetings. By doing it in this order, updates may be delivered to the client before the initial snapshot (which is fine), but the reverse order would create a brief period where an update could be generated after the snapshot but not delivered to the client.
You can see in the log below, both subscription messages are received in thread 'http-nio-8080-exec-9'. From there they are passed to the 'clientInboundChannel' thread pool. The topic subscription is handled by 'clientInboundChannel-5' while the app subscription is handled by 'clientInboundChannel-7'.
There is a race condition here because the topic subscription could be delayed long enough for an update to be generated after the app subscription has completed but before the topic subscription has completed, which would not be delivered to the client. I can artificially induce this by putting a breakpoint in SimpleBrokerMessageHandler.handleMessageInternal() and suspending only that thread.
As far as I can tell, it should be safe if I limit the thread pool to one thread, but that's obviously not ideal. Am I doing something wrong; is there some better way to do this?
Affects: 5.0.9
The text was updated successfully, but these errors were encountered: