Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumeService: fix client close() causing ConcurrentModificationExeption #393

Merged
merged 1 commit into from
Mar 22, 2023

Conversation

mhratson
Copy link
Collaborator

@mhratson mhratson commented Mar 22, 2023

Problem

  • calling _baseConsumer.close(), outside of the thread the consumer is running in, is invalid
  • as docummented in kafka consumer docs1

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.

The exception thrown

2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service
        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.utils.CloseableLock.<init>(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]

Solution

The recommended solution1 is

  • to use consumer.wakeup(); method
  • but the method is not yet adopted by the KMBaseConsumer interface
  • so for now _baseConsumer.close() is moved into the thread
  • calling stop now only sets _running.compareAndSet(true, false), so the runloop exits

Testing Done

Increased thread.join(5000) timeout as this implementation is slower to stop due to not interrupting the consumer thread.

- ./gradlew test

Footnotes

  1. KafkaConsumer.java 2

@mhratson mhratson force-pushed the LIKAFKA-34677-fix-close-error branch from fddc465 to c9ffcb1 Compare March 22, 2023 02:18
@mhratson mhratson changed the title ConsumeService: fix client closing causing ConcurrentModificationExeption ConsumeService: fix client close() causing ConcurrentModificationExeption Mar 22, 2023
…eption`

## Problem

- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid
- as docummented in _kafka consumer docs_[^1]

> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.

The exception thrown
```
2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service
        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.utils.CloseableLock.<init>(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]
```

## Solution

The recommended solution[^1] is
- to use `consumer.wakeup();` method
- but the method is not yet adopted by the `KMBaseConsumer` interface
- so for now `_baseConsumer.close()` is moved into the thread
- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits

[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)

## Testing Done

Increased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.

`- ./gradlew test`
@mhratson mhratson force-pushed the LIKAFKA-34677-fix-close-error branch from c9ffcb1 to 0da972f Compare March 22, 2023 02:24
@mhratson mhratson merged commit 043db64 into master Mar 22, 2023
@mhratson mhratson deleted the LIKAFKA-34677-fix-close-error branch March 22, 2023 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants