Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot use a listener container to receive from a DLT record published due to a deserialization exception #1271

Closed
garyrussell opened this issue Oct 17, 2019 · 1 comment
Assignees
Milestone

Comments

@garyrussell
Copy link
Contributor

The ErrorHandlingDeserializer2 adds a serialized (java) DeserializationException in a header; this is used by the container to throw the exception so we route to the error handler.

If the error handler publishes the message to a DLT, when a listener container consumes from that DLT, it again detects the header and throws the exception again.

	@KafkaListener(id = "so58427447", topics = "so58427447")
	public void listen(String in) {
		System.out.println(in);
	}

	@KafkaListener(id = "so58427447.DLT", topics = "so58427447.DLT",
			properties = "value-deserializer=org.apache.common.serialization.ByteArrayDeserializer")
	public void listen2(ConsumerRecord<?, ?> in) throws ClassNotFoundException, IOException {
		Header exHeader = in.headers().lastHeader(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);
		DeserializationException ex = (DeserializationException) new ObjectInputStream(
				new ByteArrayInputStream(exHeader.value())).readObject();
		System.out.println("DLT: " + new String(ex.getData()));
	}

listen2 never receives the record...

Successful dead-letter publication: SendResult [producerRecord=ProducerRecord(topic=so58427447.DLT, partition=0,...
...
Successful dead-letter publication: SendResult [producerRecord=ProducerRecord(topic=so58427447.DLT.DLT, partition=0,...
@garyrussell
Copy link
Contributor Author

garyrussell commented Oct 17, 2019

This is only a problem on 2.2.x because we only look for the header when the value is null.

In 2.3.x we set the value to the original value.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Oct 17, 2019
Resolves spring-projects#1271

Check for a DLT header before looking for a deserialization exception.

Not needed on master because the `value` is not `null` for DLT records.
artembilan pushed a commit that referenced this issue Oct 17, 2019
Resolves #1271

Check for a DLT header before looking for a deserialization exception.

Not needed on master because the `value` is not `null` for DLT records.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant