-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19390: Call AbstractIndex.safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale index file memory mappings #19961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
A label of 'needs-attention' was automatically added to this PR in order to raise the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the PR. Great fix! Left a couple of comments.
@@ -67,19 +70,23 @@ public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) | |||
|
|||
this.lastEntry = lastEntryFromIndexFile(); | |||
|
|||
log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", | |||
file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position()); | |||
inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is lock needed here in the constructor? Ditto for TimeIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think it’s necessary in the constructor, so I’ll remove it.
@@ -95,7 +99,7 @@ public void sanityCheck() { | |||
* the pair (baseOffset, 0) is returned. | |||
*/ | |||
public OffsetPosition lookup(long targetOffset) { | |||
return maybeLock(lock, () -> { | |||
return inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, in Linux, we don't acquire the lock for lookup()
, which is used for fetch requests. Both fetch and produce requests are common. Acquiring the lock in the fetch path reduces read/write concurrency. The reader only truly needs to lock when the underlying mmap changes by resize(). Since resize() is an infrequent event, we could introduce a separate resize lock, which will be held by resize()
and all readers (currently calling maybeLock()
). This will help maintain the current level of concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d like to ask the following questions:
-
The JavaDoc for java.nio.Buffer says it is not thread-safe. In the case of reads, is it still safe for multiple threads to access the same buffer concurrently?
(See: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/nio/Buffer.html) -
To improve read-operation concurrency, are you suggesting the use of ReentrantReadWriteLock instead of ReentrantLock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Good question. It's true that happens-before guarantee is not guaranteed according to javadoc. However, according to this, MappedByteBuffer is a thin wrapper and provides direct access to buffer cache of the OS. So, once you have written to it, every thread, and every process, will see the update. In addition to the index file, we read from the buffer cache for the log file without synchronization too. The implementation may change in the future, but probably unlikely.
So for now, it's probably better to use a separate resize lock to maintain the current read/write concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the advice!
I have added resizeLock, could you please check?
Please let me know if I have misunderstood anything.
throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " | ||
+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " | ||
+ timestamp(mmap(), 0)); | ||
inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A couple of more comments. Also, could you run some perf test (produce perf + consumer perf together) to make sure there is no degradation?
@@ -48,6 +51,7 @@ private enum SearchResultType { | |||
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); | |||
|
|||
protected final ReentrantLock lock = new ReentrantLock(); | |||
protected final ReentrantReadWriteLock resizeLock = new ReentrantReadWriteLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have two locks, could we add comments on how each one is being used? Also, remapLock is probably more accurate?
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E { | ||
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
lock.lock(); | ||
protected final <T, E extends Exception> T inResizeLock(Lock lock, StorageAction<T, E> action) throws E { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it's better to make this inResizeReadLock
and avoid passing in lock from the input. For consistency, we could also introduce a method like inLock
for the other lock. This way, we could make both locks private.
Here is the result of perf test:
pr (b035e4c)
trunk (7cd99ea)
|
https://issues.apache.org/jira/browse/KAFKA-19390
The AbstractIndex.resize() method does not release the old memory map for both index and time index files.
In some cases, Mixed GC may not run for a long time, which can cause the broker to crash when the vm.max_map_count limit is reached.
The root cause is that safeForceUnmap() is not being called on Linux within resize(), so we have changed the code to unmap old mmap on all operating systems.
The same problem was reported in KAFKA-7442, but the PR submitted at that time did not acquire all necessary locks around the mmap accesses and was closed without fixing the issue.