-
Notifications
You must be signed in to change notification settings - Fork 84
persistent logger - check fd before write in append() (dfs 2624) #9066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
we have used file locks for that. producers hold a share lock, and the consumer first renames the file and then takes an exclusive lock to ensure no more producers use the active file. please sync with @tangledbytes on how this is handled fitb glacier logs. |
@coderabbitai review |
✅ Actions performedReview triggered.
|
WalkthroughA retry mechanism was added to the Changes
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
npm error Exit handler never called! ✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) { | ||
const fh = await this.init(); | ||
//if another process has deleted the active file, | ||
//this process' _poll_active_file_change might have closed the fd | ||
//in that case fd is -1 | ||
//in order to avoid inter-process locking, we just re-init | ||
//the fd to the new active file. | ||
if (fh.fd === -1) continue; | ||
await fh.write(this.fs_context, buf, buf.length); | ||
break; | ||
} | ||
|
||
this.local_size += buf.length; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Retry loop still loses data & can throw unhandled EBADF
local_size
is incremented even when no write was performed (e.g. all attempts hit a closed fd). This silently corrupts accounting and drops the log entry.- Between the
fd === -1
check andfh.write()
another thread can still close the handle, causing anEBADF
that will bubble up and abort the whole caller despite the retry logic. continue
without a short delay turns the loop into busy-spin when the fd stays invalid.- When all attempts fail, the method returns
undefined
as if everything succeeded – the caller has no way to know the line was lost.
A minimal, self-contained fix:
const buf = Buffer.from(data + '\n', 'utf8');
- for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) {
- const fh = await this.init();
- //if another process has deleted the active file,
- //this process' _poll_active_file_change might have closed the fd
- //in that case fd is -1
- //in order to avoid inter-process locking, we just re-init
- //the fd to the new active file.
- if (fh.fd === -1) continue;
- await fh.write(this.fs_context, buf, buf.length);
- break;
- }
-
- this.local_size += buf.length;
+ let written = false;
+ for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) {
+ const fh = await this.init();
+
+ if (fh.fd === -1) { // handle already closed
+ await P.delay(1); // avoid busy-loop
+ continue;
+ }
+
+ try {
+ await fh.write(this.fs_context, buf, buf.length);
+ written = true;
+ break;
+ } catch (err) {
+ if (err.code !== 'EBADF') throw err; // real IO error
+ // someone closed the fd after we checked – retry
+ await this.close();
+ }
+ }
+
+ if (!written) {
+ throw new Error(`append failed after ${APPEND_ATTEMPTS_LIMIT} attempts`);
+ }
+
+ this.local_size += buf.length;
This guarantees either:
• the line was durably appended and local_size
reflects that, or
• an explicit error is surfaced to the caller.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) { | |
const fh = await this.init(); | |
//if another process has deleted the active file, | |
//this process' _poll_active_file_change might have closed the fd | |
//in that case fd is -1 | |
//in order to avoid inter-process locking, we just re-init | |
//the fd to the new active file. | |
if (fh.fd === -1) continue; | |
await fh.write(this.fs_context, buf, buf.length); | |
break; | |
} | |
this.local_size += buf.length; | |
} | |
const buf = Buffer.from(data + '\n', 'utf8'); | |
let written = false; | |
for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) { | |
const fh = await this.init(); | |
if (fh.fd === -1) { // handle already closed | |
await P.delay(1); // avoid busy-loop | |
continue; | |
} | |
try { | |
await fh.write(this.fs_context, buf, buf.length); | |
written = true; | |
break; | |
} catch (err) { | |
if (err.code !== 'EBADF') throw err; // real IO error | |
// someone closed the fd after we checked – retry | |
await this.close(); | |
} | |
} | |
if (!written) { | |
throw new Error(`append failed after ${APPEND_ATTEMPTS_LIMIT} attempts`); | |
} | |
this.local_size += buf.length; |
🤖 Prompt for AI Agents
In src/util/persistent_logger.js around lines 111 to 124, the retry loop
increments local_size even if no write occurs, can throw unhandled EBADF errors
due to race conditions, busy-spins without delay on invalid fd, and silently
fails without notifying the caller. Fix this by incrementing local_size only
after a successful write, adding a short delay before retrying on invalid fd,
catching and handling EBADF errors to retry properly, and throwing an explicit
error if all attempts fail to ensure the caller knows the write did not succeed.
@alphaprinz the way we handle this on the tape ops is that the writers must hold shared locks while a reader must acquire an exclusive lock. Basically the |
|
||
for (let attempt = 0; attempt < APPEND_ATTEMPTS_LIMIT; ++attempt) { | ||
const fh = await this.init(); | ||
//if another process has deleted the active file, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other process should not be able to delete the file if there is >= 1 process that exists as a writer. The reason is that the delete would happen only after _process
would finish processing the active log file and in order to finish this the collect_and_process
must have finished which must acquire an EXCLUSIVE
lock before doing so.
It seems that we are deleting the log file after releasing the lock. The following sequence can cause the issue:
cc: @alphaprinz @guymguym |
Describe the Problem
Scenario is:
There are two processes.
One is the nsfs s3 endpoint server. This process writes to the active log file.
Second is the "notificator". This process reads the active file and deletes it.
While handling an s3 op, nsfs calls PersistentLogger.append().
The fd is inited to the active file.
After fd is inited, but before write() is invoked, the notificator processes and deletes the active file.
nsfs process' _poll_active_file_change() closes the fd.
When nsfs invokes write() in a closed fd, error is thrown and nsfs panics.
Note that this is not a likely scenario, and the heavy load used for dfs2624 correlates with this.
Explain the Changes
Check fd before writing. If closed, reinit.
Issues: Fixed #xxx / Gap #xxx
Testing Instructions:
Because above scenario is is unlikely, I was able to reproduce it with looping the write() for ~100k times and simultaneously running the notificator.
Another option is with debug-
-add await timer between fd init and write
-break nsfs process at the timer
-run notificator ("manage_nsfs notification").
-wait for notifcator to end
-wait for _poll_active_file_change interval (default is 10 seconds)
-step over await timer
-the async _poll_active_file_change should run and close the fd
-now fd is closed and next line for nsfs is the write.
Summary by CodeRabbit