Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #2641, #2535, and shutdown_timeout blocking for no reason #2649

Merged
merged 18 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ process = [
"winapi/winerror",
]
# Includes basic task execution capabilities
rt-core = []
rt-core = ["slab"]
rt-util = []
rt-threaded = [
"num_cpus",
Expand Down Expand Up @@ -129,7 +129,7 @@ proptest = "0.9.4"
tempfile = "3.1.0"

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.3.4", features = ["futures", "checkpoint"] }
loom = { version = "0.3.5", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ impl Park for Driver {
self.turn(Some(duration))?;
Ok(())
}

fn shutdown(&mut self) {}
}

impl fmt::Debug for Driver {
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/park/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ where
Either::B(b) => b.park_timeout(duration).map_err(Either::B),
}
}

fn shutdown(&mut self) {
match self {
Either::A(a) => a.shutdown(),
Either::B(b) => b.shutdown(),
}
}
}

impl<A, B> Unpark for Either<A, B>
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/park/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub(crate) trait Park {
/// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;

/// Release all resources holded by the parker for proper leak-free shutdown
fn shutdown(&mut self);
}

/// Unblock a thread blocked by the associated `Park` instance.
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl Park for ParkThread {
self.inner.park_timeout(duration);
Ok(())
}

fn shutdown(&mut self) {
self.inner.shutdown();
}
}

// ==== impl Inner ====
Expand Down Expand Up @@ -188,6 +192,10 @@ impl Inner {

self.condvar.notify_one()
}

fn shutdown(&self) {
self.condvar.notify_all();
}
}

impl Default for ParkThread {
Expand Down Expand Up @@ -259,6 +267,10 @@ cfg_block_on! {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}

fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
}


Expand Down
34 changes: 28 additions & 6 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};

use slab::Slab;

use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
Expand Down Expand Up @@ -41,6 +43,7 @@ struct Inner {
/// Call before a thread stops
before_stop: Option<Callback>,

// Maximum number of threads
thread_cap: usize,
}

Expand All @@ -51,6 +54,7 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
worker_threads: Slab<thread::JoinHandle<()>>,
}

type Task = task::Notified<NoopSchedule>;
Expand Down Expand Up @@ -96,6 +100,7 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
worker_threads: Slab::new(),
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
Expand Down Expand Up @@ -126,10 +131,15 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());

drop(shared);

self.shutdown_rx.wait(timeout);
if self.shutdown_rx.wait(timeout) {
for handle in workers.drain() {
let _ = handle.join();
}
}
}
}

Expand Down Expand Up @@ -187,13 +197,23 @@ impl Spawner {
};

if let Some(shutdown_tx) = shutdown_tx {
self.spawn_thread(shutdown_tx, rt);
let mut shared = self.inner.shared.lock().unwrap();
let entry = shared.worker_threads.vacant_entry();

let handle = self.spawn_thread(shutdown_tx, rt, entry.key());

entry.insert(handle);
}

Ok(())
}

fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
worker_id: usize,
) -> thread::JoinHandle<()> {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());

if let Some(stack_size) = self.inner.stack_size {
Expand All @@ -207,16 +227,16 @@ impl Spawner {
// Only the reference should be moved into the closure
let rt = &rt;
rt.enter(move || {
rt.blocking_spawner.inner.run();
rt.blocking_spawner.inner.run(worker_id);
drop(shutdown_tx);
})
})
.unwrap();
.unwrap()
}
}

impl Inner {
fn run(&self) {
fn run(&self, worker_id: usize) {
if let Some(f) = &self.after_start {
f()
}
Expand Down Expand Up @@ -252,6 +272,8 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
shared.worker_threads.remove(worker_id);

break 'main;
}

Expand Down
11 changes: 7 additions & 4 deletions tokio/src/runtime/blocking/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ impl Receiver {
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
///
/// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
use crate::runtime::enter::try_enter;

if timeout == Some(Duration::from_nanos(0)) {
return;
return true;
}

let mut e = match try_enter(false) {
Some(enter) => enter,
_ => {
if std::thread::panicking() {
// Don't panic in a panic
return;
return false;
} else {
panic!(
"Cannot drop a runtime in a context where blocking is not allowed. \
Expand All @@ -60,9 +62,10 @@ impl Receiver {
// current thread (usually, shutting down a runtime stored in a
// thread-local).
if let Some(timeout) = timeout {
let _ = e.block_on_timeout(&mut self.rx, timeout);
e.block_on_timeout(&mut self.rx, timeout).is_ok()
} else {
let _ = e.block_on(&mut self.rx);
true
}
}
}
9 changes: 4 additions & 5 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,10 @@ impl Runtime {
/// runtime.shutdown_timeout(Duration::from_millis(100));
/// }
/// ```
pub fn shutdown_timeout(self, duration: Duration) {
let Runtime {
mut blocking_pool, ..
} = self;
blocking_pool.shutdown(Some(duration));
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
self.handle.spawner.shutdown();
self.blocking_pool.shutdown(Some(duration));
}

/// Shutdown the runtime, without waiting for any spawned tasks to shutdown.
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ impl Park for Parker {
Ok(())
}
}

fn shutdown(&mut self) {
self.inner.shutdown();
}
}

impl Unpark for Unparker {
Expand Down Expand Up @@ -242,4 +246,12 @@ impl Inner {
fn unpark_driver(&self) {
self.shared.handle.unpark();
}

fn shutdown(&self) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
}

self.condvar.notify_all();
}
}
11 changes: 11 additions & 0 deletions tokio/src/runtime/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ pub(crate) enum Spawner {
ThreadPool(thread_pool::Spawner),
}

impl Spawner {
pub(crate) fn shutdown(&mut self) {
#[cfg(feature = "rt-threaded")]
{
if let Spawner::ThreadPool(spawner) = self {
spawner.shutdown();
}
}
}
}

cfg_rt_core! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl fmt::Debug for ThreadPool {

impl Drop for ThreadPool {
fn drop(&mut self) {
self.spawner.shared.close();
self.spawner.shutdown();
}
}

Expand All @@ -108,6 +108,10 @@ impl Spawner {
self.shared.schedule(task, false);
handle
}

pub(crate) fn shutdown(&mut self) {
self.shared.close();
}
}

impl fmt::Debug for Spawner {
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,8 @@ impl Core {

// Drain the queue
while self.next_local_task().is_some() {}

park.shutdown();
}

fn drain_pending_drop(&mut self, worker: &Worker) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/time/driver/atomic_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Iterator for AtomicStackEntries {
type Item = Arc<Entry>;

fn next(&mut self) -> Option<Self::Item> {
if self.ptr.is_null() {
if self.ptr.is_null() || self.ptr == SHUTDOWN {
return None;
}

Expand Down
27 changes: 23 additions & 4 deletions tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use std::{cmp, fmt};
/// [timeout]: crate::time::Timeout
/// [interval]: crate::time::Interval
#[derive(Debug)]
pub(crate) struct Driver<T> {
pub(crate) struct Driver<T: Park> {
/// Shared state
inner: Arc<Inner>,

Expand All @@ -94,6 +94,9 @@ pub(crate) struct Driver<T> {

/// Source of "now" instances
clock: Clock,

/// True if the driver is being shutdown
is_shutdown: bool,
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
Expand Down Expand Up @@ -135,6 +138,7 @@ where
wheel: wheel::Wheel::new(),
park,
clock,
is_shutdown: false,
}
}

Expand Down Expand Up @@ -303,10 +307,12 @@ where

Ok(())
}
}

impl<T> Drop for Driver<T> {
fn drop(&mut self) {
fn shutdown(&mut self) {
if self.is_shutdown {
return;
}

use std::u64;

// Shutdown the stack of entries to process, preventing any new entries
Expand All @@ -319,6 +325,19 @@ impl<T> Drop for Driver<T> {
while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
entry.error(Error::shutdown());
}

self.park.shutdown();

self.is_shutdown = true;
}
}

impl<T> Drop for Driver<T>
where
T: Park,
{
fn drop(&mut self) {
self.shutdown();
}
}

Expand Down
Loading