diff --git a/futures-cpupool/src/lib.rs b/futures-cpupool/src/lib.rs index 52aa2b50c7..5b94b3d193 100644 --- a/futures-cpupool/src/lib.rs +++ b/futures-cpupool/src/lib.rs @@ -46,9 +46,9 @@ use std::sync::mpsc; use std::thread; use futures::{IntoFuture, Future, Poll, Async}; -use futures::future::lazy; +use futures::future::{lazy, Executor, ExecuteError}; use futures::sync::oneshot::{channel, Sender, Receiver}; -use futures::executor::{self, Run, Executor}; +use futures::executor::{self, Run, Executor as OldExecutor}; /// A thread pool intended to run CPU intensive work. /// @@ -220,6 +220,15 @@ impl CpuPool { } } +impl Executor for CpuPool + where F: Future + Send + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError> { + executor::spawn(future).execute(self.inner.clone()); + Ok(()) + } +} + impl Inner { fn send(&self, msg: Message) { self.tx.lock().unwrap().send(msg).unwrap(); @@ -255,7 +264,7 @@ impl Drop for CpuPool { } } -impl Executor for Inner { +impl OldExecutor for Inner { fn execute(&self, run: Run) { self.send(Message::Run(run)) } diff --git a/src/future/mod.rs b/src/future/mod.rs index 6888f1884a..7e278bac88 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -3,6 +3,7 @@ //! This module contains the `Future` trait and a number of adaptors for this //! trait. See the crate docs, and the docs for `Future`, for full detail. +use core::fmt; use core::result; // Primitive futures @@ -957,3 +958,89 @@ pub trait FutureFrom: Sized { /// Consume the given value, beginning the conversion. fn future_from(T) -> Self::Future; } + +/// A trait for types which can spawn fresh futures. +/// +/// This trait is typically implemented for "executors", or those types which +/// can execute futures to completion. Futures passed to `Spawn::spawn` +/// typically get turned into a *task* and are then driven to completion. +/// +/// On spawn, the executor takes ownership of the future and becomes responsible +/// to call `Future::poll()` whenever a readiness notification is raised. +pub trait Executor> { + /// Spawns a future to run on this `Executor`, typically in the + /// "background". + /// + /// This function will return immediately, and schedule the future `future` + /// to run on `self`. The details of scheduling and execution are left to + /// the implementations of `Executor`, but this is typically a primary point + /// for injecting concurrency in a futures-based system. Futures spawned + /// through this `execute` function tend to run concurrently while they're + /// waiting on events. + /// + /// # Errors + /// + /// Implementors of this trait are allowed to reject accepting this future + /// as well. This can happen for various reason such as: + /// + /// * The executor is shut down + /// * The executor has run out of capacity to execute futures + /// + /// The decision is left to the caller how to work with this form of error. + /// The error returned transfers ownership of the future back to the caller. + fn execute(&self, future: F) -> Result<(), ExecuteError>; +} + +/// Errors returned from the `Spawn::spawn` function. +pub struct ExecuteError { + future: F, + kind: ExecuteErrorKind, +} + +/// Kinds of errors that can be returned from the `Execute::spawn` function. +/// +/// Executors which may not always be able to accept a future may return one of +/// these errors, indicating why it was unable to spawn a future. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum ExecuteErrorKind { + /// This executor has shut down and will no longer accept new futures to + /// spawn. + Shutdown, + + /// This executor has no more capacity to run more futures. Other futures + /// need to finish before this executor can accept another. + NoCapacity, + + #[doc(hidden)] + __Nonexhaustive, +} + +impl ExecuteError { + /// Create a new `ExecuteError` + pub fn new(kind: ExecuteErrorKind, future: F) -> ExecuteError { + ExecuteError { + future: future, + kind: kind, + } + } + + /// Returns the associated reason for the error + pub fn kind(&self) -> ExecuteErrorKind { + self.kind + } + + /// Consumes self and returns the original future that was spawned. + pub fn into_future(self) -> F { + self.future + } +} + +impl fmt::Debug for ExecuteError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.kind { + ExecuteErrorKind::Shutdown => "executor has shut down".fmt(f), + ExecuteErrorKind::NoCapacity => "executor has no more capacity".fmt(f), + ExecuteErrorKind::__Nonexhaustive => panic!(), + } + } +} diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs index 2aa55985a9..f383edb252 100644 --- a/src/sync/oneshot.rs +++ b/src/sync/oneshot.rs @@ -7,6 +7,7 @@ use std::error::Error; use std::fmt; use {Future, Poll, Async}; +use future::{lazy, Lazy, Executor, IntoFuture}; use lock::Lock; use task::{self, Task}; @@ -94,12 +95,7 @@ struct Inner { /// c.send(3).unwrap(); /// ``` pub fn channel() -> (Sender, Receiver) { - let inner = Arc::new(Inner { - complete: AtomicBool::new(false), - data: Lock::new(None), - rx_task: Lock::new(None), - tx_task: Lock::new(None), - }); + let inner = Arc::new(Inner::new()); let receiver = Receiver { inner: inner.clone(), }; @@ -109,64 +105,37 @@ pub fn channel() -> (Sender, Receiver) { (sender, receiver) } -impl Sender { - #[deprecated(note = "renamed to `send`", since = "0.1.11")] - #[doc(hidden)] - #[cfg(feature = "with-deprecated")] - pub fn complete(self, t: T) { - drop(self.send(t)); +impl Inner { + fn new() -> Inner { + Inner { + complete: AtomicBool::new(false), + data: Lock::new(None), + rx_task: Lock::new(None), + tx_task: Lock::new(None), + } } - /// Completes this oneshot with a successful result. - /// - /// This function will consume `self` and indicate to the other end, the - /// `Receiver`, that the value provided is the result of the computation this - /// represents. - /// - /// If the value is successfully enqueued for the remote end to receive, - /// then `Ok(())` is returned. If the receiving end was deallocated before - /// this function was called, however, then `Err` is returned with the value - /// provided. - pub fn send(self, t: T) -> Result<(), T> { - if self.inner.complete.load(SeqCst) { + fn send(&self, t: T) -> Result<(), T> { + if self.complete.load(SeqCst) { return Err(t) } // Note that this lock acquisition should always succeed as it can only // interfere with `poll` in `Receiver` which is only called when the // `complete` flag is true, which we're setting here. - let mut slot = self.inner.data.try_lock().unwrap(); + let mut slot = self.data.try_lock().unwrap(); assert!(slot.is_none()); *slot = Some(t); drop(slot); Ok(()) } - /// Polls this `Sender` half to detect whether the `Receiver` this has - /// paired with has gone away. - /// - /// This function can be used to learn about when the `Receiver` (consumer) - /// half has gone away and nothing will be able to receive a message sent - /// from `complete`. - /// - /// Like `Future::poll`, this function will panic if it's not called from - /// within the context of a task. In otherwords, this should only ever be - /// called from inside another future. - /// - /// If `Ready` is returned then it means that the `Receiver` has disappeared - /// and the result this `Sender` would otherwise produce should no longer - /// be produced. - /// - /// If `NotReady` is returned then the `Receiver` is still alive and may be - /// able to receive a message if sent. The current task, however, is - /// scheduled to receive a notification if the corresponding `Receiver` goes - /// away. - pub fn poll_cancel(&mut self) -> Poll<(), ()> { + fn poll_cancel(&self) -> Poll<(), ()> { // Fast path up first, just read the flag and see if our other half is // gone. This flag is set both in our destructor and the oneshot // destructor, but our destructor hasn't run yet so if it's set then the // oneshot is gone. - if self.inner.complete.load(SeqCst) { + if self.complete.load(SeqCst) { return Ok(Async::Ready(())) } @@ -184,20 +153,18 @@ impl Sender { // if it fails to acquire the lock it assumes that we'll see the flag // later on. So... we then try to see the flag later on! let handle = task::current(); - match self.inner.tx_task.try_lock() { + match self.tx_task.try_lock() { Some(mut p) => *p = Some(handle), None => return Ok(Async::Ready(())), } - if self.inner.complete.load(SeqCst) { + if self.complete.load(SeqCst) { Ok(Async::Ready(())) } else { Ok(Async::NotReady) } } -} -impl Drop for Sender { - fn drop(&mut self) { + fn drop_tx(&self) { // Flag that we're a completed `Sender` and try to wake up a receiver. // Whether or not we actually stored any data will get picked up and // translated to either an item or cancellation. @@ -218,58 +185,28 @@ impl Drop for Sender { // then it would not necessarily synchronize with `inner.complete` // and deadlock might be possible, as was observed in // https://github.com/alexcrichton/futures-rs/pull/219. - self.inner.complete.store(true, SeqCst); - if let Some(mut slot) = self.inner.rx_task.try_lock() { + self.complete.store(true, SeqCst); + if let Some(mut slot) = self.rx_task.try_lock() { if let Some(task) = slot.take() { drop(slot); task.notify(); } } } -} - -/// Error returned from a `Receiver` whenever the correponding `Sender` -/// is dropped. -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub struct Canceled; - -impl fmt::Display for Canceled { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "oneshot canceled") - } -} - -impl Error for Canceled { - fn description(&self) -> &str { - "oneshot canceled" - } -} -impl Receiver { - /// Gracefully close this receiver, preventing sending any future messages. - /// - /// Any `send` operation which happens after this method returns is - /// guaranteed to fail. Once this method is called the normal `poll` method - /// can be used to determine whether a message was actually sent or not. If - /// `Canceled` is returned from `poll` then no message was sent. - pub fn close(&mut self) { + fn close_rx(&self) { // Flag our completion and then attempt to wake up the sender if it's // blocked. See comments in `drop` below for more info - self.inner.complete.store(true, SeqCst); - if let Some(mut handle) = self.inner.tx_task.try_lock() { + self.complete.store(true, SeqCst); + if let Some(mut handle) = self.tx_task.try_lock() { if let Some(task) = handle.take() { drop(handle); task.notify() } } } -} -impl Future for Receiver { - type Item = T; - type Error = Canceled; - - fn poll(&mut self) -> Poll { + fn recv(&self) -> Poll { let mut done = false; // Check to see if some data has arrived. If it hasn't then we need to @@ -279,11 +216,11 @@ impl Future for Receiver { // the only situation where this can happen is during `Sender::drop` // when we are indeed completed already. If that's happening then we // know we're completed so keep going. - if self.inner.complete.load(SeqCst) { + if self.complete.load(SeqCst) { done = true; } else { let task = task::current(); - match self.inner.rx_task.try_lock() { + match self.rx_task.try_lock() { Some(mut slot) => *slot = Some(task), None => done = true, } @@ -297,8 +234,8 @@ impl Future for Receiver { // // If we're not done, and we're not complete, though, then we've // successfully blocked our task and we return `NotReady`. - if done || self.inner.complete.load(SeqCst) { - match self.inner.data.try_lock().unwrap().take() { + if done || self.complete.load(SeqCst) { + match self.data.try_lock().unwrap().take() { Some(data) => Ok(data.into()), None => Err(Canceled), } @@ -306,19 +243,17 @@ impl Future for Receiver { Ok(Async::NotReady) } } -} -impl Drop for Receiver { - fn drop(&mut self) { + fn drop_rx(&self) { // Indicate to the `Sender` that we're done, so any future calls to // `poll_cancel` are weeded out. - self.inner.complete.store(true, SeqCst); + self.complete.store(true, SeqCst); // If we've blocked a task then there's no need for it to stick around, // so we need to drop it. If this lock acquisition fails, though, then // it's just because our `Sender` is trying to take the task, so we // let them take care of that. - if let Some(mut slot) = self.inner.rx_task.try_lock() { + if let Some(mut slot) = self.rx_task.try_lock() { let task = slot.take(); drop(slot); drop(task); @@ -331,7 +266,7 @@ impl Drop for Receiver { // Note that the `try_lock` here may fail, but only if the `Sender` is // in the process of filling in the task. If that happens then we // already flagged `complete` and they'll pick that up above. - if let Some(mut handle) = self.inner.tx_task.try_lock() { + if let Some(mut handle) = self.tx_task.try_lock() { if let Some(task) = handle.take() { drop(handle); task.notify() @@ -339,3 +274,254 @@ impl Drop for Receiver { } } } + +impl Sender { + #[deprecated(note = "renamed to `send`", since = "0.1.11")] + #[doc(hidden)] + #[cfg(feature = "with-deprecated")] + pub fn complete(self, t: T) { + drop(self.send(t)); + } + + /// Completes this oneshot with a successful result. + /// + /// This function will consume `self` and indicate to the other end, the + /// `Receiver`, that the value provided is the result of the computation this + /// represents. + /// + /// If the value is successfully enqueued for the remote end to receive, + /// then `Ok(())` is returned. If the receiving end was deallocated before + /// this function was called, however, then `Err` is returned with the value + /// provided. + pub fn send(self, t: T) -> Result<(), T> { + self.inner.send(t) + } + + /// Polls this `Sender` half to detect whether the `Receiver` this has + /// paired with has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `complete`. + /// + /// Like `Future::poll`, this function will panic if it's not called from + /// within the context of a task. In otherwords, this should only ever be + /// called from inside another future. + /// + /// If `Ready` is returned then it means that the `Receiver` has disappeared + /// and the result this `Sender` would otherwise produce should no longer + /// be produced. + /// + /// If `NotReady` is returned then the `Receiver` is still alive and may be + /// able to receive a message if sent. The current task, however, is + /// scheduled to receive a notification if the corresponding `Receiver` goes + /// away. + pub fn poll_cancel(&mut self) -> Poll<(), ()> { + self.inner.poll_cancel() + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.inner.drop_tx() + } +} + +/// Error returned from a `Receiver` whenever the correponding `Sender` +/// is dropped. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Canceled; + +impl fmt::Display for Canceled { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "oneshot canceled") + } +} + +impl Error for Canceled { + fn description(&self) -> &str { + "oneshot canceled" + } +} + +impl Receiver { + /// Gracefully close this receiver, preventing sending any future messages. + /// + /// Any `send` operation which happens after this method returns is + /// guaranteed to fail. Once this method is called the normal `poll` method + /// can be used to determine whether a message was actually sent or not. If + /// `Canceled` is returned from `poll` then no message was sent. + pub fn close(&mut self) { + self.inner.close_rx() + } +} + +impl Future for Receiver { + type Item = T; + type Error = Canceled; + + fn poll(&mut self) -> Poll { + self.inner.recv() + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + self.inner.drop_rx() + } +} + +/// Handle returned from the `spawn` function. +/// +/// This handle is a future representing the completion of a different future on +/// a separate executor. Created through the `oneshot::spawn` function this +/// handle will resolve when the future provided to `spawn` resolves on the +/// `Executor` instance provided to that function. +/// +/// If this handle is dropped then the future will automatically no longer be +/// polled and is scheduled to be dropped. This can be canceled with the +/// `forget` function, however. +pub struct SpawnHandle { + rx: Arc>>, +} + +struct ExecuteInner { + inner: Inner, + keep_running: AtomicBool, +} + +/// Type of future which `Execute` instances below must be able to spawn. +pub struct Execute { + future: F, + tx: Arc>>, +} + +/// Spawns a `future` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the completion of the future. +/// +/// The `SpawnHandle` returned is a future that is a proxy for `future` itself. +/// When `future` completes on `executor` then the `SpawnHandle` will itself be +/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is +/// thus safe to send across threads. +/// +/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is +/// not desired then the `SpawnHandle::forget` function can be used to continue +/// running the future to completion. +/// +/// # Panics +/// +/// This function will panic if the instance of `Spawn` provided is unable to +/// spawn the `future` provided. +/// +/// If the provided instance of `Spawn` does not actually run `future` to +/// completion, then the returned handle may panic when polled. Typically this +/// is not a problem, though, as most instances of `Spawn` will run futures to +/// completion. +/// +/// Note that the returned future will likely panic if the `futures` provided +/// panics. If a future running on an executor panics that typically means that +/// the executor drops the future, which falls into the above case of not +/// running the future to completion essentially. +pub fn spawn(future: F, executor: &E) -> SpawnHandle + where F: Future, + E: Executor>, +{ + let data = Arc::new(ExecuteInner { + inner: Inner::new(), + keep_running: AtomicBool::new(true), + }); + executor.execute(Execute { + future: future, + tx: data.clone(), + }).expect("failed to spawn future"); + SpawnHandle { rx: data } +} + +/// Spawns a function `f` onto the `Spawn` instance provided `s`. +/// +/// For more information see the `spawn` function in this module. This function +/// is just a thin wrapper around `spawn` which will execute the closure on the +/// executor provided and then complete the future that the closure returns. +pub fn spawn_fn(f: F, executor: &E) -> SpawnHandle + where F: FnOnce() -> R, + R: IntoFuture, + E: Executor>>, +{ + spawn(lazy(f), executor) +} + +impl SpawnHandle { + /// Drop this future without canceling the underlying future. + /// + /// When `SpawnHandle` is dropped, the spawned future will be canceled as + /// well if the future hasn't already resolved. This function can be used + /// when to drop this future but keep executing the underlying future. + pub fn forget(self) { + self.rx.keep_running.store(false, SeqCst); + } +} + +impl Future for SpawnHandle { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll { + match self.rx.inner.recv() { + Ok(Async::Ready(Ok(t))) => Ok(t.into()), + Ok(Async::Ready(Err(e))) => Err(e), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => panic!("future was canceled before completion"), + } + } +} + +impl fmt::Debug for SpawnHandle { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl Drop for SpawnHandle { + fn drop(&mut self) { + self.rx.inner.drop_rx(); + } +} + +impl Future for Execute { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + // If we're canceled then we may want to bail out early. + // + // If the `forget` function was called, though, then we keep going. + if self.tx.inner.poll_cancel().unwrap().is_ready() { + if !self.tx.keep_running.load(SeqCst) { + return Ok(().into()) + } + } + + let result = match self.future.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(t)) => Ok(t), + Err(e) => Err(e), + }; + drop(self.tx.inner.send(result)); + Ok(().into()) + } +} + +impl fmt::Debug for Execute { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .field("future", &self.future) + .finish() + } +} + +impl Drop for Execute { + fn drop(&mut self) { + self.tx.inner.drop_tx(); + } +} diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index 2e65c0084d..2350704719 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -274,6 +274,10 @@ impl Spawn { /// This method is not appropriate for all futures, and other kinds of /// executors typically provide a similar function with perhaps relaxed /// bounds as well. + /// + /// Note that this method is likely to be deprecated in favor of the + /// `futures::Executor` trait and `execute` method, but if this'd cause + /// difficulty for you please let us know! pub fn execute(self, exec: Arc) where F: Future + Send + 'static, { @@ -398,6 +402,10 @@ pub trait Unpark: Send + Sync { /// This trait is an argument to the `Spawn::execute` which is used to run a /// future to completion. An executor will receive requests to run a future and /// an executor is responsible for ensuring that happens in a timely fashion. +/// +/// Note that this trait is likely to be deprecated and/or renamed to avoid +/// clashing with the `future::Executor` trait. If you've got a use case for +/// this or would like to comment on the name please let us know! pub trait Executor: Send + Sync + 'static { /// Requests that `Run` is executed soon on the given executor. fn execute(&self, r: Run); diff --git a/src/unsync/oneshot.rs b/src/unsync/oneshot.rs index 0cd44bed4f..05eb7f182e 100644 --- a/src/unsync/oneshot.rs +++ b/src/unsync/oneshot.rs @@ -3,10 +3,12 @@ //! This channel is similar to that in `sync::oneshot` but cannot be sent across //! threads. -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; +use std::fmt; use std::rc::{Rc, Weak}; use {Future, Poll, Async}; +use future::{Executor, IntoFuture, Lazy, lazy}; use task::{self, Task}; /// Creates a new futures-aware, one-shot channel. @@ -195,3 +197,140 @@ impl Drop for Receiver { self.close(); } } + +/// Handle returned from the `spawn` function. +/// +/// This handle is a future representing the completion of a different future on +/// a separate executor. Created through the `oneshot::spawn` function this +/// handle will resolve when the future provided to `spawn` resolves on the +/// `Executor` instance provided to that function. +/// +/// If this handle is dropped then the future will automatically no longer be +/// polled and is scheduled to be dropped. This can be canceled with the +/// `forget` function, however. +pub struct SpawnHandle { + rx: Receiver>, + keep_running: Rc>, +} + +/// Type of future which `Spawn` instances below must be able to spawn. +pub struct Execute { + future: F, + tx: Option>>, + keep_running: Rc>, +} + +/// Spawns a `future` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the completion of the future. +/// +/// The `SpawnHandle` returned is a future that is a proxy for `future` itself. +/// When `future` completes on `executor` then the `SpawnHandle` will itself be +/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is +/// thus not safe to send across threads. +/// +/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is +/// not desired then the `SpawnHandle::forget` function can be used to continue +/// running the future to completion. +/// +/// # Panics +/// +/// This function will panic if the instance of `Spawn` provided is unable to +/// spawn the `future` provided. +/// +/// If the provided instance of `Spawn` does not actually run `future` to +/// completion, then the returned handle may panic when polled. Typically this +/// is not a problem, though, as most instances of `Spawn` will run futures to +/// completion. +pub fn spawn(future: F, executor: &E) -> SpawnHandle + where F: Future, + E: Executor>, +{ + let flag = Rc::new(Cell::new(true)); + let (tx, rx) = channel(); + executor.execute(Execute { + future: future, + tx: Some(tx), + keep_running: flag.clone(), + }).expect("failed to spawn future"); + SpawnHandle { + rx: rx, + keep_running: flag, + } +} + +/// Spawns a function `f` onto the `Spawn` instance provided `s`. +/// +/// For more information see the `spawn` function in this module. This function +/// is just a thin wrapper around `spawn` which will execute the closure on the +/// executor provided and then complete the future that the closure returns. +pub fn spawn_fn(f: F, executor: &E) -> SpawnHandle + where F: FnOnce() -> R, + R: IntoFuture, + E: Executor>>, +{ + spawn(lazy(f), executor) +} + +impl SpawnHandle { + /// Drop this future without canceling the underlying future. + /// + /// When `SpawnHandle` is dropped, the spawned future will be canceled as + /// well if the future hasn't already resolved. This function can be used + /// when to drop this future but keep executing the underlying future. + pub fn forget(self) { + self.keep_running.set(false); + } +} + +impl Future for SpawnHandle { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(Ok(t))) => Ok(t.into()), + Ok(Async::Ready(Err(e))) => Err(e), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => panic!("future was canceled before completion"), + } + } +} + +impl fmt::Debug for SpawnHandle { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl Future for Execute { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + // If we're canceled then we may want to bail out early. + // + // If the `forget` function was called, though, then we keep going. + if self.tx.as_mut().unwrap().poll_cancel().unwrap().is_ready() { + if !self.keep_running.get() { + return Ok(().into()) + } + } + + let result = match self.future.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(t)) => Ok(t), + Err(e) => Err(e), + }; + drop(self.tx.take().unwrap().send(result)); + Ok(().into()) + } +} + +impl fmt::Debug for Execute { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .field("future", &self.future) + .finish() + } +}