Skip to content

Commit

Permalink
Add spawn_scoped macro and modify JoinHandle to take a task lifetime
Browse files Browse the repository at this point in the history
  • Loading branch information
udoprog committed Dec 20, 2019
1 parent 248bf21 commit ba7f91c
Show file tree
Hide file tree
Showing 23 changed files with 80 additions and 41 deletions.
2 changes: 1 addition & 1 deletion tokio/src/blocking/mod.rs
Expand Up @@ -4,7 +4,7 @@

cfg_blocking_impl! {
mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
pub(crate) use pool::{spawn_blocking, spawn_scoped, BlockingPool, Spawner};

mod schedule;
mod task;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/fs/mod.rs
Expand Up @@ -99,5 +99,5 @@ mod sys {

// TODO: don't rename
pub(crate) use crate::runtime::spawn_blocking as run;
pub(crate) use crate::task::JoinHandle as Blocking;
pub(crate) type Blocking<T> = crate::task::JoinHandle<'static, T>;
}
4 changes: 2 additions & 2 deletions tokio/src/io/blocking.rs
Expand Up @@ -13,7 +13,7 @@ use self::State::*;

/// `T` should not implement _both_ Read and Write.
#[derive(Debug)]
pub(crate) struct Blocking<T> {
pub(crate) struct Blocking<T: 'static> {
inner: Option<T>,
state: State<T>,
/// true if the lower IO layer needs flushing
Expand All @@ -29,7 +29,7 @@ pub(crate) struct Buf {
pub(crate) const MAX_BUF: usize = 16 * 1024;

#[derive(Debug)]
enum State<T> {
enum State<T: 'static> {
Idle(Option<Buf>),
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/mod.rs
Expand Up @@ -220,6 +220,6 @@ cfg_io_blocking! {
mod sys {
// TODO: don't rename
pub(crate) use crate::runtime::spawn_blocking as run;
pub(crate) use crate::task::JoinHandle as Blocking;
pub(crate) type Blocking<T> = crate::task::JoinHandle<'static, T>;
}
}
6 changes: 6 additions & 0 deletions tokio/src/macros/blocking.rs
@@ -0,0 +1,6 @@
/// Spawn a scoped blocking task, which can share references from the future it
/// is being spawned from.
#[macro_export]
macro_rules! spawn_scoped {
($func:expr) => { unsafe { crate::task::spawn_scoped($func) }.await }
}
5 changes: 5 additions & 0 deletions tokio/src/macros/mod.rs
Expand Up @@ -15,3 +15,8 @@ mod ready;

#[macro_use]
mod thread_local;

cfg_blocking! {
#[macro_use]
mod blocking;
}
2 changes: 1 addition & 1 deletion tokio/src/net/addr.rs
Expand Up @@ -233,7 +233,7 @@ pub(crate) mod sealed {
#[derive(Debug)]
pub enum MaybeReady {
Ready(Option<SocketAddr>),
Blocking(JoinHandle<io::Result<vec::IntoIter<SocketAddr>>>),
Blocking(JoinHandle<'static, io::Result<vec::IntoIter<SocketAddr>>>),
}

#[doc(hidden)]
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -78,7 +78,7 @@ where
}

/// Spawn a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down Expand Up @@ -155,7 +155,7 @@ where

impl Spawner {
/// Spawn a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down Expand Up @@ -221,7 +221,7 @@ impl SchedulerPriv {
///
/// Must be called from the same thread that holds the `BasicScheduler`
/// value.
pub(super) unsafe fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(super) unsafe fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/blocking/mod.rs
Expand Up @@ -5,7 +5,7 @@

cfg_blocking_impl! {
mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
pub(crate) use pool::{spawn_blocking, spawn_scoped, BlockingPool, Spawner};

mod schedule;
mod shutdown;
Expand Down
19 changes: 18 additions & 1 deletion tokio/src/runtime/blocking/pool.rs
Expand Up @@ -77,7 +77,7 @@ thread_local! {
const KEEP_ALIVE: Duration = Duration::from_secs(10);

/// Run the provided function on an executor dedicated to blocking operations.
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<'static, R>
where
F: FnOnce() -> R + Send + 'static,
{
Expand All @@ -93,6 +93,23 @@ where
})
}

/// Run the provided function on an executor dedicated to blocking operations.
pub(crate) unsafe fn spawn_scoped<'a, F, R>(func: F) -> JoinHandle<'a, R>
where
F: FnOnce() -> R + Send + Sync + 'a,
{
BLOCKING.with(|cell| {
let schedule = match cell.get() {
Some(ptr) => &*ptr,
None => panic!("not currently running on the Tokio runtime."),
};

let (task, handle) = task::joinable(BlockingTask::new(func));
schedule.schedule(task);
handle
})
}

// ===== impl BlockingPool =====

impl BlockingPool {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/global.rs
Expand Up @@ -25,7 +25,7 @@ thread_local! {
// ===== global spawn fns =====

/// Spawns a future on the default executor.
pub(crate) fn spawn<T>(future: T) -> JoinHandle<T::Output>
pub(crate) fn spawn<T>(future: T) -> JoinHandle<'static, T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/handle.rs
Expand Up @@ -71,7 +71,7 @@ cfg_rt_core! {
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/mod.rs
Expand Up @@ -201,7 +201,7 @@ mod blocking;
use blocking::BlockingPool;

cfg_blocking_impl! {
pub(crate) use blocking::spawn_blocking;
pub(crate) use blocking::{spawn_blocking, spawn_scoped};
}

mod builder;
Expand Down Expand Up @@ -384,7 +384,7 @@ impl Runtime {
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
#[cfg(feature = "rt-core")]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/spawner.rs
Expand Up @@ -36,7 +36,7 @@ impl Spawner {

cfg_rt_core! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/thread_pool/mod.rs
Expand Up @@ -81,7 +81,7 @@ impl ThreadPool {
}

/// Spawn a task
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/thread_pool/slice.rs
Expand Up @@ -55,7 +55,7 @@ impl Set {
}
}

pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn_typed<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/thread_pool/spawner.rs
Expand Up @@ -28,7 +28,7 @@ impl Spawner {
}

/// Spawn a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand Down
13 changes: 12 additions & 1 deletion tokio/src/task/blocking.rs
Expand Up @@ -55,11 +55,22 @@ cfg_blocking! {
/// # Ok(())
/// # }
/// ```
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<'static, R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
crate::runtime::spawn_blocking(f)
}


/// Run the provided closure on a thread where blocking is acceptable in a
/// scoped fashion.
pub unsafe fn spawn_scoped<'a, F, R>(f: F) -> JoinHandle<'a, R>
where
F: FnOnce() -> R + Send + Sync + 'a,
R: Send + 'static,
{
crate::runtime::spawn_scoped(f)
}
}
20 changes: 10 additions & 10 deletions tokio/src/task/join.rs
Expand Up @@ -76,27 +76,27 @@ doc_rt_core! {
/// [`task::spawn`]: crate::task::spawn()
/// [`task::spawn_blocking`]: crate::task::spawn_blocking
/// [`std::thread::JoinHandle`]: std::thread::JoinHandle
pub struct JoinHandle<T> {
pub struct JoinHandle<'a, T> {
raw: Option<RawTask>,
_p: PhantomData<T>,
_p: PhantomData<&'a T>,
}
}

unsafe impl<T: Send> Send for JoinHandle<T> {}
unsafe impl<T: Send> Sync for JoinHandle<T> {}
unsafe impl<'a, T: Send> Send for JoinHandle<'a, T> {}
unsafe impl<'a, T: Send> Sync for JoinHandle<'a, T> {}

impl<T> JoinHandle<T> {
pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
impl<'a, T> JoinHandle<'a, T> {
pub(super) fn new(raw: RawTask) -> JoinHandle<'a, T> {
JoinHandle {
raw: Some(raw),
_p: PhantomData,
}
}
}

impl<T> Unpin for JoinHandle<T> {}
impl<'a, T> Unpin for JoinHandle<'a, T> {}

impl<T> Future for JoinHandle<T> {
impl<'a, T> Future for JoinHandle<'a, T> {
type Output = super::Result<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -135,7 +135,7 @@ impl<T> Future for JoinHandle<T> {
}
}

impl<T> Drop for JoinHandle<T> {
impl<'a, T> Drop for JoinHandle<'a, T> {
fn drop(&mut self) {
if let Some(raw) = self.raw.take() {
if raw.header().state.drop_join_handle_fast() {
Expand All @@ -147,7 +147,7 @@ impl<T> Drop for JoinHandle<T> {
}
}

impl<T> fmt::Debug for JoinHandle<T>
impl<'a, T> fmt::Debug for JoinHandle<'a, T>
where
T: fmt::Debug,
{
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/task/local.rs
Expand Up @@ -134,7 +134,7 @@ cfg_rt_util! {
/// }).await.unwrap();
/// });
/// ```
pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
pub fn spawn_local<F>(future: F) -> JoinHandle<'static, F::Output>
where
F: Future + 'static,
F::Output: 'static,
Expand Down Expand Up @@ -203,7 +203,7 @@ impl LocalSet {
/// });
/// ```
/// [`spawn_local`]: fn.spawn_local.html
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<'static, F::Output>
where
F: Future + 'static,
F::Output: 'static,
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/task/mod.rs
Expand Up @@ -209,7 +209,7 @@
//! [`thread::yield_now`]: std::thread::yield_now
cfg_blocking! {
mod blocking;
pub use blocking::spawn_blocking;
pub use blocking::{spawn_blocking, spawn_scoped};

cfg_rt_threaded! {
pub use blocking::block_in_place;
Expand Down Expand Up @@ -305,9 +305,9 @@ cfg_rt_core! {
pub(crate) trait ScheduleSendOnly: Schedule + Send + Sync {}

/// Create a new task with an associated join handle
pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>)
pub(crate) fn joinable<'a, T, S>(task: T) -> (Task<S>, JoinHandle<'a, T::Output>)
where
T: Future + Send + 'static,
T: Future + Send + 'a,
S: ScheduleSendOnly,
{
let raw = RawTask::new_joinable::<_, S>(task);
Expand All @@ -324,7 +324,7 @@ cfg_rt_core! {

cfg_rt_util! {
/// Create a new `!Send` task with an associated join handle
pub(crate) fn joinable_local<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>)
pub(crate) fn joinable_local<'a, T, S>(task: T) -> (Task<S>, JoinHandle<'a, T::Output>)
where
T: Future + 'static,
S: Schedule,
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/task/raw.rs
Expand Up @@ -67,17 +67,17 @@ cfg_rt_util! {
}

impl RawTask {
pub(super) fn new_joinable<T, S>(task: T) -> RawTask
pub(super) fn new_joinable<'a, T, S>(task: T) -> RawTask
where
T: Future + Send + 'static,
T: Future + Send + 'a,
S: ScheduleSendOnly,
{
RawTask::new::<_, S>(task, State::new_joinable())
}

fn new<T, S>(task: T, state: State) -> RawTask
fn new<'a, T, S>(task: T, state: State) -> RawTask
where
T: Future + 'static,
T: Future + 'a,
S: Schedule,
{
let ptr = Box::into_raw(Cell::new::<S>(task, state));
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/spawn.rs
Expand Up @@ -118,7 +118,7 @@ doc_rt_core! {
/// ```text
/// error[E0391]: cycle detected when processing `main`
/// ```
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
pub fn spawn<T>(task: T) -> JoinHandle<'static, T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
Expand Down

0 comments on commit ba7f91c

Please sign in to comment.