-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
We are trying to implement manual commit with Spring DSL Kafka with the below code. We couldn't find any reference to it.
We can add consumer porperty as "Auto.commit" as "False", but we wanted to commit the message once we process the message successfully. Can some one help on this?
@Bean
IntegrationFlow consumer() {
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka
.inboundChannelAdapter(new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
.consumerProperties(
props -> props.put("zookeeper.session.timeout.ms", "500").put("zookeeper.sync.time.ms", "250").
put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").
put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").
put("auto.offset.reset", "smallest").
put("auto.commit.interval.ms", "100")
)
.addConsumer(this.kafkaConfig.getConsumerGroup(),
metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put(this.kafkaConfig.getTopicRead(), 1)).maxMessages(1));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<byte[]>>> handle((payload, headers) -> {
payload.entrySet().forEach(e -> processMessage((ConcurrentHashMap<Integer, List<byte[]>>) e.getValue()));
return null;
}).get();
}
Version:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>
Metadata
Metadata
Assignees
Labels
No labels