From 0ed8fc5dc08515cbec73f8980e0975f3b140654d Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 21 Apr 2017 12:29:14 -0700 Subject: [PATCH] Add a generic `Executor` trait This commit adds a new trait to the `future` module, `Executor`. This trait has only one method: trait Execute> { fn execute(&self, f: F) -> Result<(), ExecuteError>; } The purpose of this trait is to unify the various executors found throughout the ecosystem. Crates which require the ability to spawn futures will now have the option ot operate generically over all `Executor` instances instead of a particular executor. The two `oneshot` modules (sync and unsync) also grew `spawn` and `spawn_fn` functions which will spawn a future onto an executor, returning a handle to the resulting future. This can be used to spawn work onto a specific executor and still get notified when the work itself is completed. Finally, an implementation of the `Executor` trait was added to `CpuPool`. Due to the differences in unwinding/panic semantics, though, the `CpuPool::spawn` method which previously existed was not deprecated. Closes #313 --- futures-cpupool/src/lib.rs | 15 +- src/future/mod.rs | 87 +++++++++ src/sync/oneshot.rs | 380 +++++++++++++++++++++++++++---------- src/task_impl/std/mod.rs | 8 + src/unsync/oneshot.rs | 141 +++++++++++++- 5 files changed, 530 insertions(+), 101 deletions(-) diff --git a/futures-cpupool/src/lib.rs b/futures-cpupool/src/lib.rs index 52aa2b50c7..5b94b3d193 100644 --- a/futures-cpupool/src/lib.rs +++ b/futures-cpupool/src/lib.rs @@ -46,9 +46,9 @@ use std::sync::mpsc; use std::thread; use futures::{IntoFuture, Future, Poll, Async}; -use futures::future::lazy; +use futures::future::{lazy, Executor, ExecuteError}; use futures::sync::oneshot::{channel, Sender, Receiver}; -use futures::executor::{self, Run, Executor}; +use futures::executor::{self, Run, Executor as OldExecutor}; /// A thread pool intended to run CPU intensive work. /// @@ -220,6 +220,15 @@ impl CpuPool { } } +impl Executor for CpuPool + where F: Future + Send + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError> { + executor::spawn(future).execute(self.inner.clone()); + Ok(()) + } +} + impl Inner { fn send(&self, msg: Message) { self.tx.lock().unwrap().send(msg).unwrap(); @@ -255,7 +264,7 @@ impl Drop for CpuPool { } } -impl Executor for Inner { +impl OldExecutor for Inner { fn execute(&self, run: Run) { self.send(Message::Run(run)) } diff --git a/src/future/mod.rs b/src/future/mod.rs index 6888f1884a..7e278bac88 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -3,6 +3,7 @@ //! This module contains the `Future` trait and a number of adaptors for this //! trait. See the crate docs, and the docs for `Future`, for full detail. +use core::fmt; use core::result; // Primitive futures @@ -957,3 +958,89 @@ pub trait FutureFrom: Sized { /// Consume the given value, beginning the conversion. fn future_from(T) -> Self::Future; } + +/// A trait for types which can spawn fresh futures. +/// +/// This trait is typically implemented for "executors", or those types which +/// can execute futures to completion. Futures passed to `Spawn::spawn` +/// typically get turned into a *task* and are then driven to completion. +/// +/// On spawn, the executor takes ownership of the future and becomes responsible +/// to call `Future::poll()` whenever a readiness notification is raised. +pub trait Executor> { + /// Spawns a future to run on this `Executor`, typically in the + /// "background". + /// + /// This function will return immediately, and schedule the future `future` + /// to run on `self`. The details of scheduling and execution are left to + /// the implementations of `Executor`, but this is typically a primary point + /// for injecting concurrency in a futures-based system. Futures spawned + /// through this `execute` function tend to run concurrently while they're + /// waiting on events. + /// + /// # Errors + /// + /// Implementors of this trait are allowed to reject accepting this future + /// as well. This can happen for various reason such as: + /// + /// * The executor is shut down + /// * The executor has run out of capacity to execute futures + /// + /// The decision is left to the caller how to work with this form of error. + /// The error returned transfers ownership of the future back to the caller. + fn execute(&self, future: F) -> Result<(), ExecuteError>; +} + +/// Errors returned from the `Spawn::spawn` function. +pub struct ExecuteError { + future: F, + kind: ExecuteErrorKind, +} + +/// Kinds of errors that can be returned from the `Execute::spawn` function. +/// +/// Executors which may not always be able to accept a future may return one of +/// these errors, indicating why it was unable to spawn a future. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum ExecuteErrorKind { + /// This executor has shut down and will no longer accept new futures to + /// spawn. + Shutdown, + + /// This executor has no more capacity to run more futures. Other futures + /// need to finish before this executor can accept another. + NoCapacity, + + #[doc(hidden)] + __Nonexhaustive, +} + +impl ExecuteError { + /// Create a new `ExecuteError` + pub fn new(kind: ExecuteErrorKind, future: F) -> ExecuteError { + ExecuteError { + future: future, + kind: kind, + } + } + + /// Returns the associated reason for the error + pub fn kind(&self) -> ExecuteErrorKind { + self.kind + } + + /// Consumes self and returns the original future that was spawned. + pub fn into_future(self) -> F { + self.future + } +} + +impl fmt::Debug for ExecuteError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.kind { + ExecuteErrorKind::Shutdown => "executor has shut down".fmt(f), + ExecuteErrorKind::NoCapacity => "executor has no more capacity".fmt(f), + ExecuteErrorKind::__Nonexhaustive => panic!(), + } + } +} diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs index 2aa55985a9..f383edb252 100644 --- a/src/sync/oneshot.rs +++ b/src/sync/oneshot.rs @@ -7,6 +7,7 @@ use std::error::Error; use std::fmt; use {Future, Poll, Async}; +use future::{lazy, Lazy, Executor, IntoFuture}; use lock::Lock; use task::{self, Task}; @@ -94,12 +95,7 @@ struct Inner { /// c.send(3).unwrap(); /// ``` pub fn channel() -> (Sender, Receiver) { - let inner = Arc::new(Inner { - complete: AtomicBool::new(false), - data: Lock::new(None), - rx_task: Lock::new(None), - tx_task: Lock::new(None), - }); + let inner = Arc::new(Inner::new()); let receiver = Receiver { inner: inner.clone(), }; @@ -109,64 +105,37 @@ pub fn channel() -> (Sender, Receiver) { (sender, receiver) } -impl Sender { - #[deprecated(note = "renamed to `send`", since = "0.1.11")] - #[doc(hidden)] - #[cfg(feature = "with-deprecated")] - pub fn complete(self, t: T) { - drop(self.send(t)); +impl Inner { + fn new() -> Inner { + Inner { + complete: AtomicBool::new(false), + data: Lock::new(None), + rx_task: Lock::new(None), + tx_task: Lock::new(None), + } } - /// Completes this oneshot with a successful result. - /// - /// This function will consume `self` and indicate to the other end, the - /// `Receiver`, that the value provided is the result of the computation this - /// represents. - /// - /// If the value is successfully enqueued for the remote end to receive, - /// then `Ok(())` is returned. If the receiving end was deallocated before - /// this function was called, however, then `Err` is returned with the value - /// provided. - pub fn send(self, t: T) -> Result<(), T> { - if self.inner.complete.load(SeqCst) { + fn send(&self, t: T) -> Result<(), T> { + if self.complete.load(SeqCst) { return Err(t) } // Note that this lock acquisition should always succeed as it can only // interfere with `poll` in `Receiver` which is only called when the // `complete` flag is true, which we're setting here. - let mut slot = self.inner.data.try_lock().unwrap(); + let mut slot = self.data.try_lock().unwrap(); assert!(slot.is_none()); *slot = Some(t); drop(slot); Ok(()) } - /// Polls this `Sender` half to detect whether the `Receiver` this has - /// paired with has gone away. - /// - /// This function can be used to learn about when the `Receiver` (consumer) - /// half has gone away and nothing will be able to receive a message sent - /// from `complete`. - /// - /// Like `Future::poll`, this function will panic if it's not called from - /// within the context of a task. In otherwords, this should only ever be - /// called from inside another future. - /// - /// If `Ready` is returned then it means that the `Receiver` has disappeared - /// and the result this `Sender` would otherwise produce should no longer - /// be produced. - /// - /// If `NotReady` is returned then the `Receiver` is still alive and may be - /// able to receive a message if sent. The current task, however, is - /// scheduled to receive a notification if the corresponding `Receiver` goes - /// away. - pub fn poll_cancel(&mut self) -> Poll<(), ()> { + fn poll_cancel(&self) -> Poll<(), ()> { // Fast path up first, just read the flag and see if our other half is // gone. This flag is set both in our destructor and the oneshot // destructor, but our destructor hasn't run yet so if it's set then the // oneshot is gone. - if self.inner.complete.load(SeqCst) { + if self.complete.load(SeqCst) { return Ok(Async::Ready(())) } @@ -184,20 +153,18 @@ impl Sender { // if it fails to acquire the lock it assumes that we'll see the flag // later on. So... we then try to see the flag later on! let handle = task::current(); - match self.inner.tx_task.try_lock() { + match self.tx_task.try_lock() { Some(mut p) => *p = Some(handle), None => return Ok(Async::Ready(())), } - if self.inner.complete.load(SeqCst) { + if self.complete.load(SeqCst) { Ok(Async::Ready(())) } else { Ok(Async::NotReady) } } -} -impl Drop for Sender { - fn drop(&mut self) { + fn drop_tx(&self) { // Flag that we're a completed `Sender` and try to wake up a receiver. // Whether or not we actually stored any data will get picked up and // translated to either an item or cancellation. @@ -218,58 +185,28 @@ impl Drop for Sender { // then it would not necessarily synchronize with `inner.complete` // and deadlock might be possible, as was observed in // https://github.com/alexcrichton/futures-rs/pull/219. - self.inner.complete.store(true, SeqCst); - if let Some(mut slot) = self.inner.rx_task.try_lock() { + self.complete.store(true, SeqCst); + if let Some(mut slot) = self.rx_task.try_lock() { if let Some(task) = slot.take() { drop(slot); task.notify(); } } } -} - -/// Error returned from a `Receiver` whenever the correponding `Sender` -/// is dropped. -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub struct Canceled; - -impl fmt::Display for Canceled { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "oneshot canceled") - } -} - -impl Error for Canceled { - fn description(&self) -> &str { - "oneshot canceled" - } -} -impl Receiver { - /// Gracefully close this receiver, preventing sending any future messages. - /// - /// Any `send` operation which happens after this method returns is - /// guaranteed to fail. Once this method is called the normal `poll` method - /// can be used to determine whether a message was actually sent or not. If - /// `Canceled` is returned from `poll` then no message was sent. - pub fn close(&mut self) { + fn close_rx(&self) { // Flag our completion and then attempt to wake up the sender if it's // blocked. See comments in `drop` below for more info - self.inner.complete.store(true, SeqCst); - if let Some(mut handle) = self.inner.tx_task.try_lock() { + self.complete.store(true, SeqCst); + if let Some(mut handle) = self.tx_task.try_lock() { if let Some(task) = handle.take() { drop(handle); task.notify() } } } -} -impl Future for Receiver { - type Item = T; - type Error = Canceled; - - fn poll(&mut self) -> Poll { + fn recv(&self) -> Poll { let mut done = false; // Check to see if some data has arrived. If it hasn't then we need to @@ -279,11 +216,11 @@ impl Future for Receiver { // the only situation where this can happen is during `Sender::drop` // when we are indeed completed already. If that's happening then we // know we're completed so keep going. - if self.inner.complete.load(SeqCst) { + if self.complete.load(SeqCst) { done = true; } else { let task = task::current(); - match self.inner.rx_task.try_lock() { + match self.rx_task.try_lock() { Some(mut slot) => *slot = Some(task), None => done = true, } @@ -297,8 +234,8 @@ impl Future for Receiver { // // If we're not done, and we're not complete, though, then we've // successfully blocked our task and we return `NotReady`. - if done || self.inner.complete.load(SeqCst) { - match self.inner.data.try_lock().unwrap().take() { + if done || self.complete.load(SeqCst) { + match self.data.try_lock().unwrap().take() { Some(data) => Ok(data.into()), None => Err(Canceled), } @@ -306,19 +243,17 @@ impl Future for Receiver { Ok(Async::NotReady) } } -} -impl Drop for Receiver { - fn drop(&mut self) { + fn drop_rx(&self) { // Indicate to the `Sender` that we're done, so any future calls to // `poll_cancel` are weeded out. - self.inner.complete.store(true, SeqCst); + self.complete.store(true, SeqCst); // If we've blocked a task then there's no need for it to stick around, // so we need to drop it. If this lock acquisition fails, though, then // it's just because our `Sender` is trying to take the task, so we // let them take care of that. - if let Some(mut slot) = self.inner.rx_task.try_lock() { + if let Some(mut slot) = self.rx_task.try_lock() { let task = slot.take(); drop(slot); drop(task); @@ -331,7 +266,7 @@ impl Drop for Receiver { // Note that the `try_lock` here may fail, but only if the `Sender` is // in the process of filling in the task. If that happens then we // already flagged `complete` and they'll pick that up above. - if let Some(mut handle) = self.inner.tx_task.try_lock() { + if let Some(mut handle) = self.tx_task.try_lock() { if let Some(task) = handle.take() { drop(handle); task.notify() @@ -339,3 +274,254 @@ impl Drop for Receiver { } } } + +impl Sender { + #[deprecated(note = "renamed to `send`", since = "0.1.11")] + #[doc(hidden)] + #[cfg(feature = "with-deprecated")] + pub fn complete(self, t: T) { + drop(self.send(t)); + } + + /// Completes this oneshot with a successful result. + /// + /// This function will consume `self` and indicate to the other end, the + /// `Receiver`, that the value provided is the result of the computation this + /// represents. + /// + /// If the value is successfully enqueued for the remote end to receive, + /// then `Ok(())` is returned. If the receiving end was deallocated before + /// this function was called, however, then `Err` is returned with the value + /// provided. + pub fn send(self, t: T) -> Result<(), T> { + self.inner.send(t) + } + + /// Polls this `Sender` half to detect whether the `Receiver` this has + /// paired with has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `complete`. + /// + /// Like `Future::poll`, this function will panic if it's not called from + /// within the context of a task. In otherwords, this should only ever be + /// called from inside another future. + /// + /// If `Ready` is returned then it means that the `Receiver` has disappeared + /// and the result this `Sender` would otherwise produce should no longer + /// be produced. + /// + /// If `NotReady` is returned then the `Receiver` is still alive and may be + /// able to receive a message if sent. The current task, however, is + /// scheduled to receive a notification if the corresponding `Receiver` goes + /// away. + pub fn poll_cancel(&mut self) -> Poll<(), ()> { + self.inner.poll_cancel() + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.inner.drop_tx() + } +} + +/// Error returned from a `Receiver` whenever the correponding `Sender` +/// is dropped. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Canceled; + +impl fmt::Display for Canceled { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "oneshot canceled") + } +} + +impl Error for Canceled { + fn description(&self) -> &str { + "oneshot canceled" + } +} + +impl Receiver { + /// Gracefully close this receiver, preventing sending any future messages. + /// + /// Any `send` operation which happens after this method returns is + /// guaranteed to fail. Once this method is called the normal `poll` method + /// can be used to determine whether a message was actually sent or not. If + /// `Canceled` is returned from `poll` then no message was sent. + pub fn close(&mut self) { + self.inner.close_rx() + } +} + +impl Future for Receiver { + type Item = T; + type Error = Canceled; + + fn poll(&mut self) -> Poll { + self.inner.recv() + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + self.inner.drop_rx() + } +} + +/// Handle returned from the `spawn` function. +/// +/// This handle is a future representing the completion of a different future on +/// a separate executor. Created through the `oneshot::spawn` function this +/// handle will resolve when the future provided to `spawn` resolves on the +/// `Executor` instance provided to that function. +/// +/// If this handle is dropped then the future will automatically no longer be +/// polled and is scheduled to be dropped. This can be canceled with the +/// `forget` function, however. +pub struct SpawnHandle { + rx: Arc>>, +} + +struct ExecuteInner { + inner: Inner, + keep_running: AtomicBool, +} + +/// Type of future which `Execute` instances below must be able to spawn. +pub struct Execute { + future: F, + tx: Arc>>, +} + +/// Spawns a `future` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the completion of the future. +/// +/// The `SpawnHandle` returned is a future that is a proxy for `future` itself. +/// When `future` completes on `executor` then the `SpawnHandle` will itself be +/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is +/// thus safe to send across threads. +/// +/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is +/// not desired then the `SpawnHandle::forget` function can be used to continue +/// running the future to completion. +/// +/// # Panics +/// +/// This function will panic if the instance of `Spawn` provided is unable to +/// spawn the `future` provided. +/// +/// If the provided instance of `Spawn` does not actually run `future` to +/// completion, then the returned handle may panic when polled. Typically this +/// is not a problem, though, as most instances of `Spawn` will run futures to +/// completion. +/// +/// Note that the returned future will likely panic if the `futures` provided +/// panics. If a future running on an executor panics that typically means that +/// the executor drops the future, which falls into the above case of not +/// running the future to completion essentially. +pub fn spawn(future: F, executor: &E) -> SpawnHandle + where F: Future, + E: Executor>, +{ + let data = Arc::new(ExecuteInner { + inner: Inner::new(), + keep_running: AtomicBool::new(true), + }); + executor.execute(Execute { + future: future, + tx: data.clone(), + }).expect("failed to spawn future"); + SpawnHandle { rx: data } +} + +/// Spawns a function `f` onto the `Spawn` instance provided `s`. +/// +/// For more information see the `spawn` function in this module. This function +/// is just a thin wrapper around `spawn` which will execute the closure on the +/// executor provided and then complete the future that the closure returns. +pub fn spawn_fn(f: F, executor: &E) -> SpawnHandle + where F: FnOnce() -> R, + R: IntoFuture, + E: Executor>>, +{ + spawn(lazy(f), executor) +} + +impl SpawnHandle { + /// Drop this future without canceling the underlying future. + /// + /// When `SpawnHandle` is dropped, the spawned future will be canceled as + /// well if the future hasn't already resolved. This function can be used + /// when to drop this future but keep executing the underlying future. + pub fn forget(self) { + self.rx.keep_running.store(false, SeqCst); + } +} + +impl Future for SpawnHandle { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll { + match self.rx.inner.recv() { + Ok(Async::Ready(Ok(t))) => Ok(t.into()), + Ok(Async::Ready(Err(e))) => Err(e), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => panic!("future was canceled before completion"), + } + } +} + +impl fmt::Debug for SpawnHandle { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl Drop for SpawnHandle { + fn drop(&mut self) { + self.rx.inner.drop_rx(); + } +} + +impl Future for Execute { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + // If we're canceled then we may want to bail out early. + // + // If the `forget` function was called, though, then we keep going. + if self.tx.inner.poll_cancel().unwrap().is_ready() { + if !self.tx.keep_running.load(SeqCst) { + return Ok(().into()) + } + } + + let result = match self.future.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(t)) => Ok(t), + Err(e) => Err(e), + }; + drop(self.tx.inner.send(result)); + Ok(().into()) + } +} + +impl fmt::Debug for Execute { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .field("future", &self.future) + .finish() + } +} + +impl Drop for Execute { + fn drop(&mut self) { + self.tx.inner.drop_tx(); + } +} diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index 2e65c0084d..2350704719 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -274,6 +274,10 @@ impl Spawn { /// This method is not appropriate for all futures, and other kinds of /// executors typically provide a similar function with perhaps relaxed /// bounds as well. + /// + /// Note that this method is likely to be deprecated in favor of the + /// `futures::Executor` trait and `execute` method, but if this'd cause + /// difficulty for you please let us know! pub fn execute(self, exec: Arc) where F: Future + Send + 'static, { @@ -398,6 +402,10 @@ pub trait Unpark: Send + Sync { /// This trait is an argument to the `Spawn::execute` which is used to run a /// future to completion. An executor will receive requests to run a future and /// an executor is responsible for ensuring that happens in a timely fashion. +/// +/// Note that this trait is likely to be deprecated and/or renamed to avoid +/// clashing with the `future::Executor` trait. If you've got a use case for +/// this or would like to comment on the name please let us know! pub trait Executor: Send + Sync + 'static { /// Requests that `Run` is executed soon on the given executor. fn execute(&self, r: Run); diff --git a/src/unsync/oneshot.rs b/src/unsync/oneshot.rs index 0cd44bed4f..05eb7f182e 100644 --- a/src/unsync/oneshot.rs +++ b/src/unsync/oneshot.rs @@ -3,10 +3,12 @@ //! This channel is similar to that in `sync::oneshot` but cannot be sent across //! threads. -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; +use std::fmt; use std::rc::{Rc, Weak}; use {Future, Poll, Async}; +use future::{Executor, IntoFuture, Lazy, lazy}; use task::{self, Task}; /// Creates a new futures-aware, one-shot channel. @@ -195,3 +197,140 @@ impl Drop for Receiver { self.close(); } } + +/// Handle returned from the `spawn` function. +/// +/// This handle is a future representing the completion of a different future on +/// a separate executor. Created through the `oneshot::spawn` function this +/// handle will resolve when the future provided to `spawn` resolves on the +/// `Executor` instance provided to that function. +/// +/// If this handle is dropped then the future will automatically no longer be +/// polled and is scheduled to be dropped. This can be canceled with the +/// `forget` function, however. +pub struct SpawnHandle { + rx: Receiver>, + keep_running: Rc>, +} + +/// Type of future which `Spawn` instances below must be able to spawn. +pub struct Execute { + future: F, + tx: Option>>, + keep_running: Rc>, +} + +/// Spawns a `future` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the completion of the future. +/// +/// The `SpawnHandle` returned is a future that is a proxy for `future` itself. +/// When `future` completes on `executor` then the `SpawnHandle` will itself be +/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is +/// thus not safe to send across threads. +/// +/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is +/// not desired then the `SpawnHandle::forget` function can be used to continue +/// running the future to completion. +/// +/// # Panics +/// +/// This function will panic if the instance of `Spawn` provided is unable to +/// spawn the `future` provided. +/// +/// If the provided instance of `Spawn` does not actually run `future` to +/// completion, then the returned handle may panic when polled. Typically this +/// is not a problem, though, as most instances of `Spawn` will run futures to +/// completion. +pub fn spawn(future: F, executor: &E) -> SpawnHandle + where F: Future, + E: Executor>, +{ + let flag = Rc::new(Cell::new(true)); + let (tx, rx) = channel(); + executor.execute(Execute { + future: future, + tx: Some(tx), + keep_running: flag.clone(), + }).expect("failed to spawn future"); + SpawnHandle { + rx: rx, + keep_running: flag, + } +} + +/// Spawns a function `f` onto the `Spawn` instance provided `s`. +/// +/// For more information see the `spawn` function in this module. This function +/// is just a thin wrapper around `spawn` which will execute the closure on the +/// executor provided and then complete the future that the closure returns. +pub fn spawn_fn(f: F, executor: &E) -> SpawnHandle + where F: FnOnce() -> R, + R: IntoFuture, + E: Executor>>, +{ + spawn(lazy(f), executor) +} + +impl SpawnHandle { + /// Drop this future without canceling the underlying future. + /// + /// When `SpawnHandle` is dropped, the spawned future will be canceled as + /// well if the future hasn't already resolved. This function can be used + /// when to drop this future but keep executing the underlying future. + pub fn forget(self) { + self.keep_running.set(false); + } +} + +impl Future for SpawnHandle { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(Ok(t))) => Ok(t.into()), + Ok(Async::Ready(Err(e))) => Err(e), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => panic!("future was canceled before completion"), + } + } +} + +impl fmt::Debug for SpawnHandle { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl Future for Execute { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + // If we're canceled then we may want to bail out early. + // + // If the `forget` function was called, though, then we keep going. + if self.tx.as_mut().unwrap().poll_cancel().unwrap().is_ready() { + if !self.keep_running.get() { + return Ok(().into()) + } + } + + let result = match self.future.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(t)) => Ok(t), + Err(e) => Err(e), + }; + drop(self.tx.take().unwrap().send(result)); + Ok(().into()) + } +} + +impl fmt::Debug for Execute { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .field("future", &self.future) + .finish() + } +}