Skip to content

Commit

Permalink
GH-1021: Fix @SendTo after error is handled
Browse files Browse the repository at this point in the history
Fixes #1021

Sending the result from a `RabbitListenerErrorHandler` was broken
for class-level `@RabbitListener` because the send to expression
was lost.

**cherry-pick to 2.1.x**

* * Also capture the generic return type after the error is handled
  • Loading branch information
garyrussell authored and artembilan committed Jun 14, 2019
1 parent cf030af commit 3e451e6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,14 @@ public boolean hasDefaultHandler() {
return this.defaultHandler != null;
}

@Nullable
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
if (handler != null) {
return new InvocationResult(result, this.handlerSendTo.get(handler),
handler.getMethod().getGenericReturnType());
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.amqp.rabbit.listener.adapter;

import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

Expand Down Expand Up @@ -94,5 +95,14 @@ public Object getBean() {
}
}

@Nullable
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
if (this.invokerHandlerMethod != null) {
return new InvocationResult(result, null, this.invokerHandlerMethod.getMethod().getGenericReturnType());
}
else {
return this.delegatingHandler.getInvocationResultFor(result, inboundPayload);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel
.build();
Object errorResult = this.errorHandler.handleError(amqpMessage, message, e);
if (errorResult != null) {
handleResult(new InvocationResult(errorResult, null, null), amqpMessage, channel, message);
handleResult(this.handlerAdapter.getInvocationResultFor(errorResult, message.getPayload()),
amqpMessage, channel, message);
}
else {
logger.trace("Error handler returned no result");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ public void multiListener() {
rabbitTemplate.convertAndSend("multi.exch", "multi.rk", bar);
rabbitTemplate.setReceiveTimeout(10000);
assertThat(this.rabbitTemplate.receiveAndConvert("sendTo.replies")).isEqualTo("BAR: bar");
bar.field = "crash";
rabbitTemplate.convertAndSend("multi.exch", "multi.rk", bar);
assertThat(this.rabbitTemplate.receiveAndConvert("sendTo.replies"))
.isEqualTo("CRASHCRASH Test reply from error handler");
bar.field = "bar";
Baz baz = new Baz();
baz.field = "baz";
assertThat(rabbitTemplate.convertSendAndReceive("multi.exch", "multi.rk", baz)).isEqualTo("BAZ: baz");
Expand Down Expand Up @@ -1550,14 +1555,22 @@ public MyService myService() {

@Bean
public RabbitListenerErrorHandler alwaysBARHandler() {
return (m, sm, e) -> "BAR";
return (msg, springMsg, ex) -> "BAR";
}

@Bean
public RabbitListenerErrorHandler upcaseAndRepeatErrorHandler() {
return (msg, springMsg, ex) -> {
String payload = ((Bar) springMsg.getPayload()).field.toUpperCase();
return payload + payload + " " + ex.getCause().getMessage();
};
}

@Bean
public RabbitListenerErrorHandler throwANewException() {
return (m, sm, e) -> {
this.errorHandlerChannel = sm.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
throw new RuntimeException("from error handler", e.getCause());
return (msg, springMsg, ex) -> {
this.errorHandlerChannel = springMsg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
throw new RuntimeException("from error handler", ex.getCause());
};
}

Expand Down Expand Up @@ -1637,7 +1650,7 @@ public TxClassLevel txClassLevel() {

@Bean
public org.springframework.amqp.core.Queue sendToReplies() {
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), false, false, true);
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), false, false, false);
}

@Bean
Expand Down Expand Up @@ -1672,12 +1685,15 @@ public DirectExchange internal() {
@RabbitListener(bindings = @QueueBinding
(value = @Queue,
exchange = @Exchange(value = "multi.exch", autoDelete = "true"),
key = "multi.rk"))
key = "multi.rk"), errorHandler = "upcaseAndRepeatErrorHandler")
static class MultiListenerBean {

@RabbitHandler
@SendTo("${foo.bar:#{sendToRepliesBean}}")
public String bar(@NonNull Bar bar) {
if (bar.field.equals("crash")) {
throw new RuntimeException("Test reply from error handler");
}
return "BAR: " + bar.field;
}

Expand Down

0 comments on commit 3e451e6

Please sign in to comment.