Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to create an OffsetAndMetdata with metadata when using Spring-Kafka #2170

Closed
garyrussell opened this issue Mar 15, 2022 Discussed in #2169 · 0 comments
Closed

Comments

@garyrussell
Copy link
Contributor

Discussed in #2169

Originally posted by frosiere March 15, 2022
In some specific cases, it may make sense to set/customize the metadata field of the OffsetAndMetadata object with data coming from the consumer. When using Spring Kafka, it's impossible as this object is created internally in KafkaMessageListenerContainer.ListenerConsumer.

Due to this, a callback can be set on the container properties but the metadata will always be empty when calling the onComplete method of the callback.

This may help having a mechanism to set these metadata.

So, the proposal would be to either review the way to set a callback or to simply have an OffsetAndMetadataProvider as follow

public interface OffsetAndMetadataProvider {

   OffsetAndMetadata provide(Consumer<?, ?> consumer, long offset);
}

This provider would then be set on the container properties as show in the following portion of code

public class MyOffsetAndMetadataProvider  implements OffsetAndMetadataProvider {

   @Override   
   OffsetAndMetadata provide(Consumer<?, ?> consumer, long offset) {
      // in this case, metadata would be the consumer group id...
      return new OffsetAndMetadata(offset, consumer.groupMetadata().groupId());
   }
}

// at consumer factory level
...
getContainerProperties().setOffsetAndMetadataProvider(new MyOffsetAndMetadataProvider());
...

Internally, Spring Kafka would need to be slightly reworked to use the following kind of method instead of directly creating the OffsetAndMetadata object.

// in KafkaMessageListenerContainer.ListenerConsumer
// would be called 6 times
private OffsetAndMetadata createOffsetAndMetadata(long offset) {
   final OffsetAndMetadataProvider offsetAndMetadataProvider = containerProperties.getOffsetAndMetadataProvider();
   return offsetAndMetadataProvider == null
         ? new OffsetAndMetadata(offset)
         : offsetAndMetadataProvider.provide(consumer, offset)
}

Any other way to set metadata is more than welcome.

If we agree on the need, I can contribute on the implementation. This one or another one.

Thanks for your reply.

@garyrussell garyrussell added this to the 3.0 Backlog milestone Mar 15, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 23, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 23, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 23, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 23, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 23, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 24, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 24, 2022
frosiere added a commit to frosiere/spring-kafka that referenced this issue Mar 30, 2022
@garyrussell garyrussell modified the milestones: 3.0 Backlog, 3.0.0-M4 Mar 30, 2022
garyrussell pushed a commit that referenced this issue Mar 30, 2022
Resolves #2170

Add a way to create a custom OffsetAndMetadata

GH-2170: Remove useless initializations
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment