Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -454,7 +454,7 @@ else if (StompCommand.CONNECTED.equals(command)) {
this.sessionHandler.afterConnected(this, headers);
}
else if (StompCommand.ERROR.equals(command)) {
invokeHandler(this.sessionHandler, message, headers);
invokeErrorFrameHandler(this.sessionHandler, message, headers);
}
else if (!isHeartbeat && logger.isTraceEnabled()) {
logger.trace("Message not handled.");
Expand All @@ -466,6 +466,10 @@ else if (!isHeartbeat && logger.isTraceEnabled()) {
}
}

private void invokeErrorFrameHandler(StompSessionHandler handler, Message<byte[]> message, StompHeaders headers) {
handler.handleErrorFrame(headers, message.getPayload());
}

private void invokeHandler(StompFrameHandler handler, Message<byte[]> message, StompHeaders headers) {
if (message.getPayload().length == 0) {
handler.handleFrame(headers, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,18 @@ public interface StompFrameHandler {
/**
* Handle a STOMP frame with the payload converted to the target type returned
* from {@link #getPayloadType(StompHeaders)}.
* Method {@link #handleErrorFrame(StompHeaders, byte[])} is invoked if the frame
* is an error frame.
* @param headers the headers of the frame
* @param payload the payload, or {@code null} if there was no payload
*/
void handleFrame(StompHeaders headers, @Nullable Object payload);

/**
* Handle a STOMP error frame received from server.
* @param headers the headers of the frame
* @param payload the payload, or {@code null} if there was no payload
*/
void handleErrorFrame(StompHeaders headers, @Nullable byte[] payload);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation seems be needed.


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -230,6 +230,7 @@ public void handleErrorFrame() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setContentType(new MimeType("text", "plain", StandardCharsets.UTF_8));
accessor.addNativeHeader("foo", "bar");
accessor.setMessage("Error message");
accessor.setLeaveMutable(true);
String payload = "Oops";

Expand All @@ -239,39 +240,7 @@ public void handleErrorFrame() {
this.session.handleMessage(MessageBuilder.createMessage(
payload.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders()));

verify(this.sessionHandler).getPayloadType(stompHeaders);
verify(this.sessionHandler).handleFrame(stompHeaders, payload);
verifyNoMoreInteractions(this.sessionHandler);
}

@Test
public void handleErrorFrameWithEmptyPayload() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.addNativeHeader("foo", "bar");
accessor.setLeaveMutable(true);
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));

StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
verify(this.sessionHandler).handleFrame(stompHeaders, null);
verifyNoMoreInteractions(this.sessionHandler);
}

@Test
public void handleErrorFrameWithConversionException() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
accessor.addNativeHeader("foo", "bar");
accessor.setLeaveMutable(true);
byte[] payload = "{'foo':'bar'}".getBytes(StandardCharsets.UTF_8);

StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
given(this.sessionHandler.getPayloadType(stompHeaders)).willReturn(Map.class);

this.session.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));

verify(this.sessionHandler).getPayloadType(stompHeaders);
verify(this.sessionHandler).handleException(same(this.session), same(StompCommand.ERROR),
eq(stompHeaders), same(payload), any(MessageConversionException.class));
verify(this.sessionHandler).handleErrorFrame(stompHeaders, payload.getBytes());
verifyNoMoreInteractions(this.sessionHandler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ public void handleException(StompSession session, StompCommand command,

@Override
public void handleFrame(StompHeaders headers, @Nullable Object payload) {
logger.error("STOMP frame " + headers + " payload=" + payload);
}

@Override
public void handleErrorFrame(StompHeaders headers, byte[] payload) {
logger.error("STOMP error frame " + headers + " payload=" + payload);
}

Expand Down Expand Up @@ -178,6 +183,11 @@ public Type getPayloadType(StompHeaders headers) {
public void handleFrame(StompHeaders headers, @Nullable Object payload) {
received.add((String) payload);
}

@Override
public void handleErrorFrame(StompHeaders headers, @Nullable byte[] payload) {
//nothing to do
}
});
subscription.addReceiptTask(subscriptionLatch::countDown);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -117,7 +117,7 @@ void publishSubscribe() throws Exception {
this.stompClient.connect(url, testHandler);

assertThat(testHandler.awaitForMessageCount(1, 5000)).isTrue();
assertThat(testHandler.getReceived()).containsExactly("payload");
assertThat(testHandler.getErrorReceived()).containsExactly("payload");
}


Expand Down Expand Up @@ -149,6 +149,8 @@ private static class TestHandler extends StompSessionHandlerAdapter {

private final List<String> received = new ArrayList<>();

private final List<String> errorReceived = new ArrayList<>();


public TestHandler(String topic, Object payload) {
this.topic = topic;
Expand All @@ -160,6 +162,10 @@ public List<String> getReceived() {
return this.received;
}

public List<String> getErrorReceived(){
return this.errorReceived;
}


@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
Expand All @@ -172,6 +178,11 @@ public Type getPayloadType(StompHeaders headers) {
public void handleFrame(StompHeaders headers, @Nullable Object payload) {
received.add((String) payload);
}

@Override
public void handleErrorFrame(StompHeaders headers, byte[] payload) {
errorReceived.add(String.valueOf(payload));
}
});
try {
// Delay send since server processes concurrently
Expand Down Expand Up @@ -210,6 +221,11 @@ public void handleFrame(StompHeaders headers, @Nullable Object payload) {
logger.error("STOMP error frame " + headers + " payload=" + payload);
}

@Override
public void handleErrorFrame(StompHeaders headers, byte[] payload) {

}

@Override
public void handleTransportError(StompSession session, Throwable exception) {
logger.error(exception);
Expand Down