Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sleuth Corrupts Spring Integration Error Message Channel Headers #761

Closed
garyrussell opened this issue Oct 27, 2017 · 4 comments
Closed
Assignees
Milestone

Comments

@garyrussell
Copy link

See https://stackoverflow.com/questions/46971098/spring-integration-error-channel-handling-broken-when-used-with-spring-cloud-sle

Error channel handling from a messaging gateway.

  1. The MessagingTemplate sets up reply/error channel headers and invokes the flow.
  2. The flow throws an exception.
  3. The template catches the exception and, if there's an error channel, constructs an ErrorMessage with failedMessage payload; new reply/error channels are set up for the error flow.
  4. The error flow returns a result and it is returned to the user.

The sleuth interceptor breaks this logic because it restores the "spent" reply/error channel and, when the error flow returns a reply, we get...

Reply message received but the receiving thread has exited due to an exception while sending the request message:GenericMessage [payload=OOPS...

It's not clear why sleuth is messing with framework headers; shouldn't it just add the span stuff?

Before interceptor on the error channel...

ErrorMessage [payload=org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException: Expression evaluation failed: 1/0; nested exception is java.lang.ArithmeticException: / by zero, failedMessage=GenericMessage [payload=hi, headers={spanTraceId=a5bccb97d0471781, spanId=4562ba4f926027e2, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, messageSent=true, id=e374cde6-7a8d-26f7-1613-133f965e0c9e, spanSampled=0}], failedMessage=GenericMessage [payload=hi, headers={spanTraceId=a5bccb97d0471781, spanId=4562ba4f926027e2, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, messageSent=true, id=e374cde6-7a8d-26f7-1613-133f965e0c9e, spanSampled=0}], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ec5225b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ec5225b, id=3d0bdb1c-3faa-e02d-604b-6c0a29d12987, timestamp=1509119092545}]

After interceptor on the error channel...

GenericMessage [payload=org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException: Expression evaluation failed: 1/0; nested exception is java.lang.ArithmeticException: / by zero, failedMessage=GenericMessage [payload=hi, headers={spanTraceId=a5bccb97d0471781, spanId=4562ba4f926027e2, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, messageSent=true, id=e374cde6-7a8d-26f7-1613-133f965e0c9e, spanSampled=0}], failedMessage=GenericMessage [payload=hi, headers={spanTraceId=a5bccb97d0471781, spanId=4562ba4f926027e2, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, messageSent=true, id=e374cde6-7a8d-26f7-1613-133f965e0c9e, spanSampled=0}], headers={spanTraceId=a5bccb97d0471781, spanId=4562ba4f926027e2, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1002de2b, messageSent=true, id=df7caa61-a3f7-a1f3-9eb5-52dac85de8fa, spanSampled=0}]

i.e. the error message channel headers are replaced with channels from the failed message; these channels are no longer valid and the result is the gateway receives no reply (or hangs forever if no timeout is specified).

@garyrussell
Copy link
Author

This code is unwrapping the ErrorMessage

Message<?> retrievedMessage = getMessage(message);
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(retrievedMessage);

I can see why you might need the sleuth headers from that, but you should not be propagating any other headers to the "new" ErrorMessage.

@garyrussell
Copy link
Author

I added a work around to the stack overflow question (remove the expired headers from the failedMessage in another interceptor.

@garyrussell
Copy link
Author

garyrussell commented Oct 27, 2017

Here is a test case that illustrates the problem...

	@Test
	public void errorMessageHeadersRetained() {
		QueueChannel deadReplyChannel = new QueueChannel();
		QueueChannel errorsReplyChannel = new QueueChannel();
		Map<String, Object> errorChannelHeaders = new HashMap<>();
		errorChannelHeaders.put(MessageHeaders.REPLY_CHANNEL, errorsReplyChannel);
		errorChannelHeaders.put(MessageHeaders.ERROR_CHANNEL, errorsReplyChannel);
		this.tracedChannel.send(new ErrorMessage(
				new MessagingException(MessageBuilder.withPayload("hi")
						.setHeader(TraceMessageHeaders.TRACE_ID_NAME, Span.idToHex(10L))
						.setHeader(TraceMessageHeaders.SPAN_ID_NAME, Span.idToHex(20L))
						.setReplyChannel(deadReplyChannel)
						.setErrorChannel(deadReplyChannel)
						.build()	),
				errorChannelHeaders));
		then(this.message).isNotNull();

		String spanId = this.message.getHeaders().get(TraceMessageHeaders.SPAN_ID_NAME, String.class);
		then(spanId).isNotNull();
		long traceId = Span
				.hexToId(this.message.getHeaders().get(TraceMessageHeaders.TRACE_ID_NAME, String.class));
		then(traceId).isEqualTo(10L);
		then(spanId).isNotEqualTo(20L);
		then(this.accumulator.getSpans()).hasSize(1);

		then(this.message.getHeaders().getReplyChannel()).isSameAs(errorsReplyChannel);
		then(this.message.getHeaders().getErrorChannel()).isSameAs(errorsReplyChannel);
	}

I believe (if the Message is an ErrorMessage) the preSend() and beforeHandle() methods should return something like...

new ErrorMessage(message.getPayload(), mapOfOriginalErrorMessageHeadersPlusSpansFromFailedMessage)

...but I don't know enough about sleuth as to whether that's the whole picture.

@marcingrzejszczak
Copy link
Contributor

Thanks, @garyrussell for this analysis. It's super helpful. I'll try to fix this next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants