Skip to content

feat: introduce notifier to avoid busy wait.#417

Merged
k82cn merged 2 commits into
xflops:mainfrom
k82cn:flm_notifier
Apr 23, 2026
Merged

feat: introduce notifier to avoid busy wait.#417
k82cn merged 2 commits into
xflops:mainfrom
k82cn:flm_notifier

Conversation

@k82cn
Copy link
Copy Markdown
Contributor

@k82cn k82cn commented Apr 22, 2026

No description provided.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a centralized notification system (NotifyManager) to replace custom Future implementations for tracking state changes in tasks, sessions, and executors. While this refactoring simplifies the asynchronous logic, the review identifies several critical issues. Multiple race conditions were found in the wait_for_task, watch_task, and wait_for_session loops because the notification future is not obtained before checking the state conditions. Furthermore, the Notifier implementation lacks a pruning mechanism for its internal map, which will lead to a memory leak as it stores entries for every unique task ID. There is also a potential bug in duration calculation where negative values could result in unintended long waits.

Comment thread session_manager/src/notify.rs Outdated

#[derive(Clone)]
pub struct Notifier<K: Clone + Eq + Hash> {
notifiers: MutexPtr<HashMap<K, Arc<Notify>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The notifiers HashMap will grow indefinitely, leading to a memory leak. Entries are added via get_or_create but are never removed (except via the unused remove method). Since TaskGID is used as a key and includes a unique TaskID, the map will accumulate entries for every task ever processed.

Consider using Weak<Notify> values in the HashMap so that Notify objects can be deallocated when no longer in use, or implement a pruning mechanism to remove entries for completed tasks/sessions.

Comment thread session_manager/src/notify.rs Outdated
Comment on lines +37 to +48
pub async fn wait(&self, key: &K) -> Result<(), FlameError> {
let notify = self.get_or_create(key)?;
notify.notified().await;
Ok(())
}

pub async fn wait_timeout(&self, key: &K, timeout: Duration) -> Result<bool, FlameError> {
let notify = self.get_or_create(key)?;
Ok(tokio::time::timeout(timeout, notify.notified())
.await
.is_ok())
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The wait and wait_timeout methods are prone to race conditions when used in a loop with a condition check. In tokio, the correct pattern for using Notify is to create the notified() future before checking the condition. Because these methods encapsulate the await, callers cannot follow this pattern, leading to a window where a notification can be missed between the condition check and the start of the wait.

I recommend exposing a method to retrieve the Arc<Notify> directly so callers can manage the future's lifetime correctly.

Comment thread session_manager/src/controller/mod.rs
Comment thread session_manager/src/controller/mod.rs
Comment on lines 211 to 235
loop {
{
let mut ssn_guard = lock_ptr!(ssn)?;
if let Some(task_ptr) = ssn_guard.pop_pending_task() {
return Ok(Some(task_ptr));
}
if ssn_guard.status.state == SessionState::Closed {
return Ok(None);
}
}

impl WaitForTaskFuture {
pub fn new(ssn: &SessionPtr, delay_release: Duration) -> Self {
Self {
ssn: ssn.clone(),
delay_release,
start_time: Utc::now(),
}
}
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Ok(None);
}

impl Future for WaitForTaskFuture {
type Output = Result<Option<TaskPtr>, FlameError>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ssn = lock_ptr!(self.ssn)?;

match ssn.pop_pending_task() {
None => {
let now = Utc::now();
let duration = now.signed_duration_since(self.start_time);
if duration.num_seconds() > self.delay_release.num_seconds()
|| ssn.status.state == SessionState::Closed
{
Poll::Ready(Ok(None))
} else {
ctx.waker().wake_by_ref();
Poll::Pending
}
if !self
.notifier
.sessions
.wait_timeout(ssn_id, remaining)
.await?
{
return Ok(None);
}
Some(task_ptr) => Poll::Ready(Ok(Some(task_ptr))),
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This loop is susceptible to a race condition. A task could be added to the session after the check at line 214 but before the call to wait_timeout at line 230. If this happens, the notification is missed, and the executor will wait until the next task is added or the timeout expires, leading to increased latency.

Comment thread session_manager/src/notify.rs Outdated
Ok(())
}

fn get_or_create(&self, key: &K) -> Result<Arc<Notify>, FlameError> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Make this method public so that callers can retrieve the Arc<Notify> and create the notification future before checking their respective conditions, thus avoiding race conditions.

Suggested change
fn get_or_create(&self, key: &K) -> Result<Arc<Notify>, FlameError> {
pub fn get_or_create(&self, key: &K) -> Result<Arc<Notify>, FlameError> {

Comment on lines +208 to +209
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(delay_release.num_seconds() as u64);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Potential issue with negative durations. If delay_release is negative, num_seconds() returns a negative value, and casting it to u64 will result in a very large number, causing an unexpected long wait.

Suggested change
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(delay_release.num_seconds() as u64);
let delay_secs = delay_release.num_seconds().max(0) as u64;
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(delay_secs);

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 22, 2026

Codecov Report

❌ Patch coverage is 39.00709% with 86 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
session_manager/src/controller/mod.rs 5.71% 66 Missing ⚠️
session_manager/src/controller/executors/bound.rs 0.00% 10 Missing ⚠️
session_manager/src/notify.rs 82.14% 10 Missing ⚠️

📢 Thoughts on this report? Let us know!

@k82cn k82cn force-pushed the flm_notifier branch 4 times, most recently from fd98d01 to 1d0ba8b Compare April 23, 2026 02:15
@k82cn
Copy link
Copy Markdown
Contributor Author

k82cn commented Apr 23, 2026

Addressed in the latest commit. The pattern now checks the condition before waiting:

loop {
    // Check condition FIRST
    {
        let task = lock_ptr!(task_ptr)?;
        if initial_state != task.state || task.is_completed() {
            return Ok((*task).clone());
        }
    }
    // THEN wait for notification
    self.notifier.tasks.wait(&gid.ssn_id, gid.task_id).await?;
}

This eliminates the race condition because:

  1. If the state already changed → return immediately (no wait needed)
  2. If notification fires before wait() → we don't care, loop back and find state changed
  3. If notification fires during wait() → wake up and recheck

The notification only needs to signal "something changed" - we always verify by rechecking actual state.

@k82cn k82cn force-pushed the flm_notifier branch 5 times, most recently from cc08f43 to 3ed21c3 Compare April 23, 2026 05:50
Replace custom busy-wait Future implementations with tokio::sync::watch
channels for efficient event-driven notifications.

Changes:
- Add NotifyManager with TaskNotifier and ExecutorNotifier using watch channels
- Replace WatchTaskFuture, WaitForTaskFuture, WaitForSsnFuture with watch-based loops
- Move wait_for_task logic from BoundState to Controller
- Add comprehensive tests for notification behavior

This eliminates CPU-wasting busy-wait polling and provides immediate
notification delivery with proper race-condition-free semantics.
Signed-off-by: Klaus Ma <klausm@nvidia.com>
@k82cn k82cn merged commit 22becd8 into xflops:main Apr 23, 2026
7 checks passed
@k82cn k82cn deleted the flm_notifier branch May 26, 2026 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant