Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@StreamListener with the Kafka Binder should handle Kafka tombstone records #455

Closed
adershrp opened this issue Sep 26, 2018 · 3 comments
Closed
Assignees
Labels
Milestone

Comments

@adershrp
Copy link

@adershrp adershrp commented Sep 26, 2018

Hi,
While we are using @StreamListener, is there any way to handle Null (KafkaNull) Payload?
I tried to use @payload(required = false) in arguments, that is not working. I guess, this work well with @KafkaListener.

Another option I tried is enableDlq for the consumer, and this message goes in there with Null Payload. But, this message is of no use to me.

Please suggest, is there any better approach?

Regards,
Adersh

@olegz

This comment has been minimized.

Copy link
Contributor

@olegz olegz commented Sep 26, 2018

No, there can never be a Message with null payload. That won'y be allowed by the underlying framework (i.e., Spring Integration). Now, KafkaNull is not null. It's an object that ". . .represents NULL Kafka payload. . ." . This means in theory you should be able to receive Message<KafkaNull>.
That said. . . .
Could you elaborate on your use case. I am not clear as to the value of null payload in general.
Also, please use Stack Overflow channel to ask questions. We use GitHub for reporting issues only.

@olegz olegz closed this Sep 26, 2018
@garyrussell

This comment has been minimized.

Copy link
Contributor

@garyrussell garyrussell commented Oct 1, 2018

Sorry for the delay; we were at SpringOne Platform last week.

Kafka records can have a null payload - this is termed a tombstone record for compacted logs (indicating the key has been deleted). Since a Spring-Messaging Message<?> can't have a null payload, we represent this condition with KafkaNull.INSTANCE.

In the @KafkaListener we have a customized payload resolver, which allows (required=false) on the payload...

			argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {

				@Override
				protected boolean isEmptyPayload(Object payload) {
					return payload == null || payload instanceof KafkaNull;
				}

			});

We should add similar logic to @StreamListener.

The outbound channel adapter already converts KafkaNull to null when passing to Kafka.

@garyrussell garyrussell reopened this Oct 1, 2018
@garyrussell garyrussell changed the title How to Handle payload=org.springframework.kafka.support.KafkaNull? @StreamListener with the Kafka Binder should handle Kafka tombstone records Oct 1, 2018
@garyrussell garyrussell self-assigned this Oct 1, 2018
@garyrussell

This comment has been minimized.

Copy link
Contributor

@garyrussell garyrussell commented Oct 3, 2018

Actually; since SCSt uses Spring Integration's HandlerMethodArgumentResolversHolder, the fix could be done there so SI apps can also benefit from it.

We'll just need a doc change here.

garyrussell added a commit to garyrussell/spring-cloud-stream-binder-kafka that referenced this issue Oct 4, 2018
garyrussell added a commit to garyrussell/spring-cloud-stream-binder-kafka that referenced this issue Oct 4, 2018
@garyrussell garyrussell added in pr and removed in progress labels Oct 4, 2018
sobychacko added a commit that referenced this issue Oct 4, 2018
@sobychacko sobychacko removed the in pr label Oct 4, 2018
@sabbyanandan sabbyanandan added this to the 2.1.0.RC1 milestone Oct 9, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.