Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaMLC - make addOffsets() thread-safe #566

Closed
garyrussell opened this issue Feb 12, 2018 · 2 comments · Fixed by #619
Closed

KafkaMLC - make addOffsets() thread-safe #566

garyrussell opened this issue Feb 12, 2018 · 2 comments · Fixed by #619
Assignees
Labels
Milestone

Comments

@garyrussell
Copy link
Contributor

@garyrussell garyrussell added this to the 2.1.3 milestone Feb 12, 2018
@garyrussell garyrussell self-assigned this Feb 13, 2018
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Feb 13, 2018
Fixes spring-projects#566

In case the user listener hands over the `Acknowledgment` to another thread.
artembilan pushed a commit that referenced this issue Feb 13, 2018
Fixes #566

In case the user listener hands over the `Acknowledgment` to another thread.
@lhsm
Copy link

lhsm commented Mar 22, 2018

Hi, seems still not thread safe. Please, take a look into KafkaMessageListenerContainer.ListenerConsumer#buildCommits. While iterating offsets new offset may be added
The same in KafkaMessageListenerContainer.ListenerConsumer#commitPendingAcks

@garyrussell
Copy link
Contributor Author

Good catch; thanks.

@garyrussell garyrussell reopened this Mar 22, 2018
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 22, 2018
Fixes spring-projects#566

If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency
problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks.

Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack
is called on a foreign thread - the `Consumer` is not thread-safe.

- Revert `offsets` to simple `HashMap`s
- Only reference `offsets` on the consumer thread
- enqueue foreign acks into the `acks` queue (even "immediate" acks)
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 22, 2018
Fixes spring-projects#566

If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency
problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks.

Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack
is called on a foreign thread - the `Consumer` is not thread-safe.

- Revert `offsets` to simple `HashMap`s
- Only reference `offsets` on the consumer thread
- enqueue foreign acks into the `acks` queue (even "immediate" acks)

**Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).**
artembilan pushed a commit that referenced this issue Mar 22, 2018
Fixes #566

If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency
problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks.

Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack
is called on a foreign thread - the `Consumer` is not thread-safe.

- Revert `offsets` to simple `HashMap`s
- Only reference `offsets` on the consumer thread
- enqueue foreign acks into the `acks` queue (even "immediate" acks)

**Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).**
artembilan pushed a commit that referenced this issue Mar 22, 2018
Fixes #566

If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency
problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks.

Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack
is called on a foreign thread - the `Consumer` is not thread-safe.

- Revert `offsets` to simple `HashMap`s
- Only reference `offsets` on the consumer thread
- enqueue foreign acks into the `acks` queue (even "immediate" acks)

**Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
artembilan pushed a commit that referenced this issue Mar 22, 2018
Fixes #566

If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency
problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks.

Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack
is called on a foreign thread - the `Consumer` is not thread-safe.

- Revert `offsets` to simple `HashMap`s
- Only reference `offsets` on the consumer thread
- enqueue foreign acks into the `acks` queue (even "immediate" acks)

**Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Fixes spring-projects/spring-kafka#566

In case the user listener hands over the `Acknowledgment` to another thread.
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Fixes spring-projects/spring-kafka#566

If `Acknowledgment.acknowledge()` is called on a foreign thread, there is a concurrency
problem with the `offsets` field; the consumer thread might `clear()` unprocessed acks.

Furthermore, `AckMode.MANUAL_IMMEDIATE` cannot use the `Consumer` object if the ack
is called on a foreign thread - the `Consumer` is not thread-safe.

- Revert `offsets` to simple `HashMap`s
- Only reference `offsets` on the consumer thread
- enqueue foreign acks into the `acks` queue (even "immediate" acks)

**Cherry pick to 2.0.x (and 1.3.x, fixing the lambda in the test and the diamond operators).**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants