Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

KafkaMessageSource and concurrency #211

Closed
bruto0 opened this issue Aug 31, 2018 · 6 comments
Closed

KafkaMessageSource and concurrency #211

bruto0 opened this issue Aug 31, 2018 · 6 comments

Comments

@bruto0
Copy link

bruto0 commented Aug 31, 2018

If you disable autoack in AcknowledgmentCallbackFactory and ack/requeue offsets in a different thread from the one poller is operating on, all sorts of things become possible here under sufficient load:

			Set<KafkaAckInfo<K, V>> candidates = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
			KafkaAckInfo<K, V> ackInfo = null;
				if (candidates.iterator().next().equals(this.ackInfo)) {
				// see if there are any pending acks for higher offsets
				List<KafkaAckInfo<K, V>> toCommit = new ArrayList<>();
				for (KafkaAckInfo<K, V> info : candidates) {

ConcurrentModificationException, NoSuchElementException

Also, acknowledged flag is set in the finally block of KafkaAckCallback.acknowledge unconditionally, although in case of ConcurrentModificationException you likely want to try ack'ing the offset again

And while we're at it, it'd be nice to have an ability to ack a bunch of offsets belonging to the same partition without sync'ing on the consumerMonitor for each one, but you'd have to expose KafkaMessageSource from the inbound channel adapter for that to work

@bruto0
Copy link
Author

bruto0 commented Aug 31, 2018

putting this:

	KafkaAckInfo<K, V> ackInfo = new KafkaAckInfoImpl(record, topicPartition);
	AcknowledgmentCallback ackCallback = this.ackCallbackFactory.createCallback(ackInfo);
	this.inflightRecords.computeIfAbsent(topicPartition, tp -> new TreeSet<>()).add(ackInfo);

inside the synchronized block should help with the ConcurrentModificationException

@artembilan
Copy link
Contributor

@bruto0 ,

It's very likely you are fully on board with what's going on.
How about to provide some fix raising Pull Request?

Thanks

@bruto0
Copy link
Author

bruto0 commented Nov 7, 2018

Moving a curly brace a few lines down isn't much of a PR :)
No idea how to tackle the other things, unfortunately

@garyrussell
Copy link
Contributor

ack/requeue offsets in a different thread

While thread-safety should be fixed; acking like that probably won't do what you need. If you hand the message off to another thread, the acks might be processed out of order, which is probably not what you want.

With Kafka, only a log offset is committed, each message is not "ack'd" per se. So if the record at offset 2 is ack'd first, the record at offset 1 is also "ack'd". Worse, when offset 1 is later committed (and let's say the app is restarted), you will get the record at offset 2 again.

@bruto0
Copy link
Author

bruto0 commented Nov 7, 2018

It does what I need :) I do know that Kafka is not a MQ and seeking backwards means replayed messages - it's ok in my case.
It's also worth a mention that KafkaMessageSource, being smart, keeps track of ack'ed record offsets to enforce order of actual commits (but that can lead to bloated inflightRecords if abused)

My app is a hack job, but we do need the rewind (seek) option and record processing outside of consumer thread so I made KMS work for me rather than write something similar on top of spring-kafka myself :)

@garyrussell
Copy link
Contributor

With Kafka, only a log offset is committed, each message is not "ack'd" per se. So if the record at offset 2 is ack'd first, the record at offset 1 is also "ack'd". Worse, when offset 1 is later committed (and let's say the app is restarted), you will get the record at offset 2 again.

Actually, I forgot; we added code to defer "future" acks and only commit them when the "gaps" are filled in.

I am fixing the concurrency now...

garyrussell added a commit to garyrussell/spring-integration-kafka that referenced this issue Nov 13, 2018
Fixes spring-attic#211

Use `ConcurrentHashMap` with synchronized set values to avoid concurrency issues
when acks are processed on multiple threads.
artembilan pushed a commit that referenced this issue Nov 13, 2018
Fixes #211

Use `ConcurrentHashMap` with synchronized set values to avoid concurrency issues
when acks are processed on multiple threads.
artembilan pushed a commit that referenced this issue Nov 13, 2018
Fixes #211

Use `ConcurrentHashMap` with synchronized set values to avoid concurrency issues
when acks are processed on multiple threads.

# Conflicts:
#	src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java
garyrussell added a commit to garyrussell/spring-integration that referenced this issue Jun 24, 2020
Fixes spring-attic/spring-integration-kafka#211

Use `ConcurrentHashMap` with synchronized set values to avoid concurrency issues
when acks are processed on multiple threads.
artembilan pushed a commit to spring-projects/spring-integration that referenced this issue Jun 25, 2020
Fixes spring-attic/spring-integration-kafka#211

Use `ConcurrentHashMap` with synchronized set values to avoid concurrency issues
when acks are processed on multiple threads.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Development

No branches or pull requests

3 participants