Skip to content

Commit

Permalink
ConsumeService: fix client closing causing `ConcurrentModificationExc…
Browse files Browse the repository at this point in the history
…eption`

## Problem

- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid
- as docummented in _kafka consumer docs_[^1]

> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.

The exception thrown
```
2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service
        at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.utils.CloseableLock.<init>(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]
        at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]
```

## Solution

The recommended solution[^1] is
- to use `consumer.wakeup();` method
- but the method is not yet adopted by the `KMBaseConsumer` interface
- so for now `_baseConsumer.close()` is moved into the thread
- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits

[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)

## Testing Done

Increased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.

`- ./gradlew test`
  • Loading branch information
mhratson committed Mar 22, 2023
1 parent c4a9c71 commit 043db64
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public ConsumeService(String name,
}
}, name + " consume-service");
_consumeThread.setDaemon(true);
_consumeThread.setUncaughtExceptionHandler((t, e) -> {
LOG.error(name + "/ConsumeService error", e);
});
});

// In a blocking fashion, waits for this topicPartitionFuture to complete, and then returns its result.
Expand Down Expand Up @@ -211,6 +214,9 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffs
}
}
/* end of consume() while loop */
LOG.info("{}/ConsumeService/Consumer closing.", _name);
_baseConsumer.close();
LOG.info("{}/ConsumeService/Consumer stopped.", _name);
}

Metrics metrics() {
Expand Down Expand Up @@ -242,17 +248,18 @@ public synchronized void start() {
@Override
public synchronized void stop() {
if (_running.compareAndSet(true, false)) {
try {
_baseConsumer.close();
} catch (Exception e) {
LOG.warn(_name + "/ConsumeService while trying to close consumer.", e);
}
LOG.info("{}/ConsumeService stopped.", _name);
LOG.info("{}/ConsumeService stopping.", _name);
}
}

@Override
public void awaitShutdown(long timeout, TimeUnit unit) {
LOG.info("{}/ConsumeService shutdown awaiting…", _name);
try {
_consumeThread.join(unit.toMillis(timeout));
} catch (InterruptedException e) {
LOG.error(_name + "/ConsumeService interrupted", e);
}
LOG.info("{}/ConsumeService shutdown completed.", _name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void run() {
Thread.sleep(100);

consumeService.stop();
thread.join(500);
thread.join(5000);

Assert.assertFalse(thread.isAlive());
Assert.assertEquals(error.get(), null);
Expand Down

0 comments on commit 043db64

Please sign in to comment.