From 7e6667ab0cd4fcfb183fd73bbd912c051add1ba8 Mon Sep 17 00:00:00 2001 From: Dennis Zou <1939115123@qq.com> Date: Fri, 9 Apr 2021 18:29:33 +0800 Subject: [PATCH 1/2] Add handleErrorFrame callback to separate normal business "MESSAGE" frame when the frame type is "ERROR" --- .../simp/stomp/DefaultStompSession.java | 6 +++- .../simp/stomp/StompFrameHandler.java | 2 ++ .../simp/stomp/DefaultStompSessionTests.java | 35 ++----------------- .../ReactorNettyTcpStompClientTests.java | 10 ++++++ .../WebSocketStompClientIntegrationTests.java | 18 +++++++++- 5 files changed, 36 insertions(+), 35 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index 391325f443d7..d5e83d683e55 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -454,7 +454,7 @@ else if (StompCommand.CONNECTED.equals(command)) { this.sessionHandler.afterConnected(this, headers); } else if (StompCommand.ERROR.equals(command)) { - invokeHandler(this.sessionHandler, message, headers); + invokeErrorFrameHandler(this.sessionHandler, message, headers); } else if (!isHeartbeat && logger.isTraceEnabled()) { logger.trace("Message not handled."); @@ -466,6 +466,10 @@ else if (!isHeartbeat && logger.isTraceEnabled()) { } } + private void invokeErrorFrameHandler(StompSessionHandler handler, Message message, StompHeaders headers) { + handler.handleErrorFrame(headers, message.getPayload()); + } + private void invokeHandler(StompFrameHandler handler, Message message, StompHeaders headers) { if (message.getPayload().length == 0) { handler.handleFrame(headers, null); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java index a0a9448e1080..b1773e61f8ca 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java @@ -43,4 +43,6 @@ public interface StompFrameHandler { */ void handleFrame(StompHeaders headers, @Nullable Object payload); + void handleErrorFrame(StompHeaders headers, @Nullable byte[] payload); + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java index 7f979b54213e..b0816d3a7d5a 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java @@ -230,6 +230,7 @@ public void handleErrorFrame() { StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.setContentType(new MimeType("text", "plain", StandardCharsets.UTF_8)); accessor.addNativeHeader("foo", "bar"); + accessor.setMessage("Error message"); accessor.setLeaveMutable(true); String payload = "Oops"; @@ -239,39 +240,7 @@ public void handleErrorFrame() { this.session.handleMessage(MessageBuilder.createMessage( payload.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders())); - verify(this.sessionHandler).getPayloadType(stompHeaders); - verify(this.sessionHandler).handleFrame(stompHeaders, payload); - verifyNoMoreInteractions(this.sessionHandler); - } - - @Test - public void handleErrorFrameWithEmptyPayload() { - StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); - accessor.addNativeHeader("foo", "bar"); - accessor.setLeaveMutable(true); - this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders())); - - StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); - verify(this.sessionHandler).handleFrame(stompHeaders, null); - verifyNoMoreInteractions(this.sessionHandler); - } - - @Test - public void handleErrorFrameWithConversionException() { - StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); - accessor.setContentType(MimeTypeUtils.APPLICATION_JSON); - accessor.addNativeHeader("foo", "bar"); - accessor.setLeaveMutable(true); - byte[] payload = "{'foo':'bar'}".getBytes(StandardCharsets.UTF_8); - - StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); - given(this.sessionHandler.getPayloadType(stompHeaders)).willReturn(Map.class); - - this.session.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders())); - - verify(this.sessionHandler).getPayloadType(stompHeaders); - verify(this.sessionHandler).handleException(same(this.session), same(StompCommand.ERROR), - eq(stompHeaders), same(payload), any(MessageConversionException.class)); + verify(this.sessionHandler).handleErrorFrame(stompHeaders, payload.getBytes()); verifyNoMoreInteractions(this.sessionHandler); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java index 90985f35d79b..d3c9167e2732 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java @@ -137,6 +137,11 @@ public void handleException(StompSession session, StompCommand command, @Override public void handleFrame(StompHeaders headers, @Nullable Object payload) { + logger.error("STOMP frame " + headers + " payload=" + payload); + } + + @Override + public void handleErrorFrame(StompHeaders headers, byte[] payload) { logger.error("STOMP error frame " + headers + " payload=" + payload); } @@ -178,6 +183,11 @@ public Type getPayloadType(StompHeaders headers) { public void handleFrame(StompHeaders headers, @Nullable Object payload) { received.add((String) payload); } + + @Override + public void handleErrorFrame(StompHeaders headers, @Nullable byte[] payload) { + //nothing to do + } }); subscription.addReceiptTask(subscriptionLatch::countDown); } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java index 28d72b22a46f..ff627b1680d0 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java @@ -117,7 +117,7 @@ void publishSubscribe() throws Exception { this.stompClient.connect(url, testHandler); assertThat(testHandler.awaitForMessageCount(1, 5000)).isTrue(); - assertThat(testHandler.getReceived()).containsExactly("payload"); + assertThat(testHandler.getErrorReceived()).containsExactly("payload"); } @@ -149,6 +149,8 @@ private static class TestHandler extends StompSessionHandlerAdapter { private final List received = new ArrayList<>(); + private final List errorReceived = new ArrayList<>(); + public TestHandler(String topic, Object payload) { this.topic = topic; @@ -160,6 +162,10 @@ public List getReceived() { return this.received; } + public List getErrorReceived(){ + return this.errorReceived; + } + @Override public void afterConnected(StompSession session, StompHeaders connectedHeaders) { @@ -172,6 +178,11 @@ public Type getPayloadType(StompHeaders headers) { public void handleFrame(StompHeaders headers, @Nullable Object payload) { received.add((String) payload); } + + @Override + public void handleErrorFrame(StompHeaders headers, byte[] payload) { + errorReceived.add(String.valueOf(payload)); + } }); try { // Delay send since server processes concurrently @@ -210,6 +221,11 @@ public void handleFrame(StompHeaders headers, @Nullable Object payload) { logger.error("STOMP error frame " + headers + " payload=" + payload); } + @Override + public void handleErrorFrame(StompHeaders headers, byte[] payload) { + + } + @Override public void handleTransportError(StompSession session, Throwable exception) { logger.error(exception); From ad02d27b84269d5319c6d76f6810120ec47020a2 Mon Sep 17 00:00:00 2001 From: Dennis Zou <1939115123@qq.com> Date: Sun, 11 Apr 2021 23:45:02 +0800 Subject: [PATCH 2/2] Update stomp handler doc. --- .../messaging/simp/stomp/DefaultStompSession.java | 2 +- .../messaging/simp/stomp/StompFrameHandler.java | 7 +++++++ .../messaging/simp/stomp/DefaultStompSessionTests.java | 2 +- .../messaging/WebSocketStompClientIntegrationTests.java | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index d5e83d683e55..8a5e5e1203e5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java index b1773e61f8ca..b0a1f1204f4d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompFrameHandler.java @@ -38,11 +38,18 @@ public interface StompFrameHandler { /** * Handle a STOMP frame with the payload converted to the target type returned * from {@link #getPayloadType(StompHeaders)}. + * Method {@link #handleErrorFrame(StompHeaders, byte[])} is invoked if the frame + * is an error frame. * @param headers the headers of the frame * @param payload the payload, or {@code null} if there was no payload */ void handleFrame(StompHeaders headers, @Nullable Object payload); + /** + * Handle a STOMP error frame received from server. + * @param headers the headers of the frame + * @param payload the payload, or {@code null} if there was no payload + */ void handleErrorFrame(StompHeaders headers, @Nullable byte[] payload); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java index b0816d3a7d5a..522b9d8b0c9e 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java index ff627b1680d0..35f385f65a4b 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.