diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b384e54c1db..c74b9643fca 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -9,6 +9,7 @@ rust_2018_idioms, unreachable_pub )] +#![deny(unused_must_use)] #![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index 51c8cb24001..3df7bba2944 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -124,9 +124,14 @@ impl Local { // There is capacity for the task break tail; } else if steal != real { - // Concurrently stealing, this will free up capacity, so - // only push the new task onto the inject queue - inject.push(task); + // Concurrently stealing, this will free up capacity, so only + // push the new task onto the inject queue + // + // If the task failes to be pushed on the injection queue, there + // is nothing to be done at this point as the task cannot be a + // newly spawned task. Shutting down this task is handled by the + // worker shutdown process. + let _ = inject.push(task); return; } else { // Push the current task and half of the queue into the @@ -507,7 +512,11 @@ impl Inject { } /// Pushes a value into the queue. - pub(super) fn push(&self, task: task::Notified) + /// + /// Returns `Err(task)` if pushing fails due to the queue being shutdown. + /// The caller is expected to call `shutdown()` on the task **if and only + /// if** it is a newly spawned task. + pub(super) fn push(&self, task: task::Notified) -> Result<(), task::Notified> where T: crate::runtime::task::Schedule, { @@ -515,11 +524,7 @@ impl Inject { let mut p = self.pointers.lock(); if p.is_closed { - // Drop the mutex to avoid a potential deadlock when - // re-entering. - drop(p); - task.shutdown(); - return; + return Err(task); } // safety: only mutated with the lock held @@ -538,6 +543,7 @@ impl Inject { p.tail = Some(task); self.len.store(len + 1, Release); + Ok(()) } pub(super) fn push_batch( diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 421b0786796..96312d34618 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -94,7 +94,13 @@ impl Spawner { F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.shared.schedule(task, false); + + if let Err(task) = self.shared.schedule(task, false) { + // The newly spawned task could not be scheduled because the runtime + // is shutting down. The task must be explicitly shutdown at this point. + task.shutdown(); + } + handle } diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 348862c58fd..70cbddbd05e 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -709,16 +709,22 @@ impl task::Schedule for Arc { } fn schedule(&self, task: Notified) { - self.shared.schedule(task, false); + // Because this is not a newly spawned task, if scheduling fails due to + // the runtime shutting down, there is no special work that must happen + // here. + let _ = self.shared.schedule(task, false); } fn yield_now(&self, task: Notified) { - self.shared.schedule(task, true); + // Because this is not a newly spawned task, if scheduling fails due to + // the runtime shutting down, there is no special work that must happen + // here. + let _ = self.shared.schedule(task, true); } } impl Shared { - pub(super) fn schedule(&self, task: Notified, is_yield: bool) { + pub(super) fn schedule(&self, task: Notified, is_yield: bool) -> Result<(), Notified> { CURRENT.with(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. @@ -726,15 +732,16 @@ impl Shared { // And the current thread still holds a core if let Some(core) = cx.core.borrow_mut().as_mut() { self.schedule_local(core, task, is_yield); - return; + return Ok(()); } } } // Otherwise, use the inject queue - self.inject.push(task); + self.inject.push(task)?; self.notify_parked(); - }); + Ok(()) + }) } fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 19b381cb97a..9e76c4ed0b7 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -12,8 +12,8 @@ use std::future::Future; use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; -use std::sync::{mpsc, Arc}; -use std::task::{Context, Poll}; +use std::sync::{mpsc, Arc, Mutex}; +use std::task::{Context, Poll, Waker}; #[test] fn single_thread() { @@ -405,6 +405,74 @@ async fn hang_on_shutdown() { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } +/// Demonstrates tokio-rs/tokio#3869 +#[test] +fn wake_during_shutdown() { + struct Shared { + waker: Option, + } + + struct MyFuture { + shared: Arc>, + put_waker: bool, + } + + impl MyFuture { + fn new() -> (Self, Self) { + let shared = Arc::new(Mutex::new(Shared { waker: None })); + let f1 = MyFuture { + shared: shared.clone(), + put_waker: true, + }; + let f2 = MyFuture { + shared, + put_waker: false, + }; + (f1, f2) + } + } + + impl Future for MyFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let me = Pin::into_inner(self); + let mut lock = me.shared.lock().unwrap(); + println!("poll {}", me.put_waker); + if me.put_waker { + println!("putting"); + lock.waker = Some(cx.waker().clone()); + } + Poll::Pending + } + } + + impl Drop for MyFuture { + fn drop(&mut self) { + println!("drop {} start", self.put_waker); + let mut lock = self.shared.lock().unwrap(); + if !self.put_waker { + lock.waker.take().unwrap().wake(); + } + drop(lock); + println!("drop {} stop", self.put_waker); + } + } + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let (f1, f2) = MyFuture::new(); + + rt.spawn(f1); + rt.spawn(f2); + + rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await }); +} + fn rt() -> Runtime { Runtime::new().unwrap() }