Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: add Runtime::shutdown_timeout #2186

Merged
merged 7 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ pub(crate) struct ParkThread {
inner: Arc<Inner>,
}

/// Error returned by `ParkThread`
///
/// This currently is never returned, but might at some point in the future.
#[derive(Debug)]
pub(crate) struct ParkError {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reshuffling internal code. This used to be public, which is why ParkError is opaque.

_p: (),
}
pub(crate) type ParkError = ();

/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -240,7 +234,7 @@ cfg_blocking_impl! {
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner))
.map_err(|_| ParkError { _p: () })
.map_err(|_| ())
}
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ cfg_blocking_impl! {

cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;

#[derive(Debug, Clone)]
pub(crate) struct BlockingPool {}
Expand All @@ -35,5 +36,8 @@ cfg_not_blocking_impl! {
pub(crate) fn spawner(&self) -> &BlockingPool {
self
}

pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
}
carllerche marked this conversation as resolved.
Show resolved Hide resolved
}
}
19 changes: 15 additions & 4 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,30 @@ impl BlockingPool {
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
}
}

impl Drop for BlockingPool {
fn drop(&mut self) {
pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
let mut shared = self.spawner.inner.shared.lock().unwrap();

// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
// prevents shutting down twice.
if shared.shutdown {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function can be called multiple times. First, by explicitly calling shutdown then by the drop handler calling shutdown. This prevents shutting down twice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good if there was a comment in the code stating that, as well as on the PR?

return;
}

shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();

drop(shared);

self.shutdown_rx.wait();
self.shutdown_rx.wait(timeout);
}
}

impl Drop for BlockingPool {
fn drop(&mut self) {
self.shutdown(None);
}
}

Expand Down
14 changes: 12 additions & 2 deletions tokio/src/runtime/blocking/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use crate::loom::sync::Arc;
use crate::sync::oneshot;

use std::time::Duration;

#[derive(Debug, Clone)]
pub(super) struct Sender {
tx: Arc<oneshot::Sender<()>>,
Expand All @@ -26,7 +28,11 @@ pub(super) fn channel() -> (Sender, Receiver) {

impl Receiver {
/// Blocks the current thread until all `Sender` handles drop.
pub(crate) fn wait(&mut self) {
///
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
use crate::runtime::enter::{enter, try_enter};

let mut e = if std::thread::panicking() {
Expand All @@ -43,6 +49,10 @@ impl Receiver {
// If blocking fails to wait, this indicates a problem parking the
// current thread (usually, shutting down a runtime stored in a
// thread-local).
let _ = e.block_on(&mut self.rx);
if let Some(timeout) = timeout {
let _ = e.block_on_timeout(&mut self.rx, timeout);
} else {
let _ = e.block_on(&mut self.rx);
}
}
}
39 changes: 39 additions & 0 deletions tokio/src/runtime/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {

cfg_blocking_impl! {
use crate::park::ParkError;
use std::time::Duration;

impl Enter {
/// Blocks the thread on the specified future, returning the value with
Expand Down Expand Up @@ -104,6 +105,44 @@ cfg_blocking_impl! {
park.park()?;
}
}

/// Blocks the thread on the specified future for **at most** `timeout`
///
/// If the future completes before `timeout`, the result is returned. If
/// `timeout` elapses, then `Err` is returned.
pub(crate) fn block_on_timeout<F>(&mut self, mut f: F, timeout: Duration) -> Result<F::Output, ParkError>
where
F: std::future::Future,
{
use crate::park::{CachedParkThread, Park};
use std::pin::Pin;
use std::task::Context;
use std::task::Poll::Ready;
Comment on lines +117 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor style nit: some of these imports are now used by both block_on and block_on_timeout, should they be moved out of the individual functions?

use std::time::Instant;

let mut park = CachedParkThread::new();
let waker = park.get_unpark()?.into_waker();
let mut cx = Context::from_waker(&waker);

// `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
// no longer be accessed, making the pinning safe.
let mut f = unsafe { Pin::new_unchecked(&mut f) };
let when = Instant::now() + timeout;

loop {
if let Ready(v) = f.as_mut().poll(&mut cx) {
return Ok(v);
}

let now = Instant::now();

if now >= when {
return Err(());
}

park.park_timeout(when - now)?;
}
}
}
}

Expand Down
43 changes: 43 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ cfg_rt_core! {
}

use std::future::Future;
use std::time::Duration;

/// The Tokio runtime.
///
Expand Down Expand Up @@ -441,4 +442,46 @@ impl Runtime {
pub fn handle(&self) -> &Handle {
&self.handle
}

/// Shutdown the runtime, waiting for at most `duration` for all spawned
/// task to shutdown.
///
/// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
/// shutdown in a timely fashion. However, dropping a `Runtime` will wait
/// indefinitely for all tasks to terminate, and there are cases where a long
/// blocking task has been spawned which can block dropping `Runtime`.
///
/// In this case, calling `shutdown_timeout` with an explicit wait timeout
/// can work. The `shutdown_timeout` will signal all tasks to shutdown and
/// will wait for at most `duration` for all spawned tasks to terminate. If
/// `timeout` elapses before all tasks are dropped, the function returns and
/// outstanding tasks are potentially leaked.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::task;
///
/// use std::thread;
/// use std::time::Duration;
///
/// fn main() {
/// let mut runtime = Runtime::new().unwrap();
///
/// runtime.block_on(async move {
/// task::spawn_blocking(move || {
/// thread::sleep(Duration::from_secs(10_000));
/// });
/// });
///
/// runtime.shutdown_timeout(Duration::from_millis(100));
/// }
/// ```
pub fn shutdown_timeout(self, duration: Duration) {
let Runtime {
mut blocking_pool, ..
} = self;
blocking_pool.shutdown(Some(duration));
}
}
17 changes: 17 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,23 @@ rt_test! {
}
}

#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
let mut runtime = rt();

runtime.block_on(async move {
task::spawn_blocking(move || {
tx.send(()).unwrap();
thread::sleep(Duration::from_secs(10_000));
});

rx.await.unwrap();
});

runtime.shutdown_timeout(Duration::from_millis(100));
}

#[test]
fn runtime_in_thread_local() {
use std::cell::RefCell;
Expand Down