Skip to content

Commit

Permalink
task: fix leak in LocalSet (#3978)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn authored Jul 22, 2021
1 parent ced7992 commit 998dc5a
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 7 deletions.
47 changes: 47 additions & 0 deletions tokio/src/runtime/tests/loom_local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::Builder;
use crate::task::LocalSet;

use std::task::Poll;

/// Waking a runtime will attempt to push a task into a queue of notifications
/// in the runtime, however the tasks in such a queue usually have a reference
/// to the runtime itself. This means that if they are not properly removed at
/// runtime shutdown, this will cause a memory leak.
///
/// This test verifies that waking something during shutdown of a LocalSet does
/// not result in tasks lingering in the queue once shutdown is complete. This
/// is verified using loom's leak finder.
#[test]
fn wake_during_shutdown() {
loom::model(|| {
let rt = Builder::new_current_thread().build().unwrap();
let ls = LocalSet::new();

let (send, recv) = oneshot::channel();

ls.spawn_local(async move {
let mut send = Some(send);

let () = futures::future::poll_fn(|cx| {
if let Some(send) = send.take() {
send.send(cx.waker().clone());
}

Poll::Pending
})
.await;
});

let handle = loom::thread::spawn(move || {
let waker = recv.recv();
waker.wake();
});

ls.block_on(&rt, crate::task::yield_now());

drop(ls);
handle.join().unwrap();
drop(rt);
});
}
1 change: 1 addition & 0 deletions tokio/src/runtime/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod unowned_wrapper {

cfg_loom! {
mod loom_basic_scheduler;
mod loom_local;
mod loom_blocking;
mod loom_oneshot;
mod loom_pool;
Expand Down
33 changes: 26 additions & 7 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ struct Tasks {
/// LocalSet state shared between threads.
struct Shared {
/// Remote run queue sender
queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,

/// Wake the `LocalSet` task
waker: AtomicWaker,
Expand Down Expand Up @@ -339,7 +339,7 @@ impl LocalSet {
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
waker: AtomicWaker::new(),
}),
},
Expand Down Expand Up @@ -549,15 +549,23 @@ impl LocalSet {
.shared
.queue
.lock()
.pop_front()
.as_mut()
.and_then(|queue| queue.pop_front())
.or_else(|| self.context.tasks.borrow_mut().queue.pop_front())
} else {
self.context
.tasks
.borrow_mut()
.queue
.pop_front()
.or_else(|| self.context.shared.queue.lock().pop_front())
.or_else(|| {
self.context
.shared
.queue
.lock()
.as_mut()
.and_then(|queue| queue.pop_front())
})
}
}

Expand Down Expand Up @@ -627,7 +635,10 @@ impl Drop for LocalSet {
task.shutdown();
}

for task in self.context.shared.queue.lock().drain(..) {
// Take the queue from the Shared object to prevent pushing
// notifications to it in the future.
let queue = self.context.shared.queue.lock().take().unwrap();
for task in queue {
task.shutdown();
}

Expand Down Expand Up @@ -677,8 +688,16 @@ impl Shared {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().push_back(task);
self.waker.wake();
// First check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();

if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
}
});
}
Expand Down

0 comments on commit 998dc5a

Please sign in to comment.