Skip to content

Commit

Permalink
Merge pull request #455 from alexcrichton/spawn-trait
Browse files Browse the repository at this point in the history
Add a generic `Executor` trait
  • Loading branch information
alexcrichton committed May 25, 2017
2 parents 878a9a8 + 0ed8fc5 commit b24dea1
Show file tree
Hide file tree
Showing 5 changed files with 530 additions and 101 deletions.
15 changes: 12 additions & 3 deletions futures-cpupool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ use std::sync::mpsc;
use std::thread;

use futures::{IntoFuture, Future, Poll, Async};
use futures::future::lazy;
use futures::future::{lazy, Executor, ExecuteError};
use futures::sync::oneshot::{channel, Sender, Receiver};
use futures::executor::{self, Run, Executor};
use futures::executor::{self, Run, Executor as OldExecutor};

/// A thread pool intended to run CPU intensive work.
///
Expand Down Expand Up @@ -220,6 +220,15 @@ impl CpuPool {
}
}

impl<F> Executor<F> for CpuPool
where F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
executor::spawn(future).execute(self.inner.clone());
Ok(())
}
}

impl Inner {
fn send(&self, msg: Message) {
self.tx.lock().unwrap().send(msg).unwrap();
Expand Down Expand Up @@ -255,7 +264,7 @@ impl Drop for CpuPool {
}
}

impl Executor for Inner {
impl OldExecutor for Inner {
fn execute(&self, run: Run) {
self.send(Message::Run(run))
}
Expand Down
87 changes: 87 additions & 0 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! This module contains the `Future` trait and a number of adaptors for this
//! trait. See the crate docs, and the docs for `Future`, for full detail.

use core::fmt;
use core::result;

// Primitive futures
Expand Down Expand Up @@ -957,3 +958,89 @@ pub trait FutureFrom<T>: Sized {
/// Consume the given value, beginning the conversion.
fn future_from(T) -> Self::Future;
}

/// A trait for types which can spawn fresh futures.
///
/// This trait is typically implemented for "executors", or those types which
/// can execute futures to completion. Futures passed to `Spawn::spawn`
/// typically get turned into a *task* and are then driven to completion.
///
/// On spawn, the executor takes ownership of the future and becomes responsible
/// to call `Future::poll()` whenever a readiness notification is raised.
pub trait Executor<F: Future<Item = (), Error = ()>> {
/// Spawns a future to run on this `Executor`, typically in the
/// "background".
///
/// This function will return immediately, and schedule the future `future`
/// to run on `self`. The details of scheduling and execution are left to
/// the implementations of `Executor`, but this is typically a primary point
/// for injecting concurrency in a futures-based system. Futures spawned
/// through this `execute` function tend to run concurrently while they're
/// waiting on events.
///
/// # Errors
///
/// Implementors of this trait are allowed to reject accepting this future
/// as well. This can happen for various reason such as:
///
/// * The executor is shut down
/// * The executor has run out of capacity to execute futures
///
/// The decision is left to the caller how to work with this form of error.
/// The error returned transfers ownership of the future back to the caller.
fn execute(&self, future: F) -> Result<(), ExecuteError<F>>;
}

/// Errors returned from the `Spawn::spawn` function.
pub struct ExecuteError<F> {
future: F,
kind: ExecuteErrorKind,
}

/// Kinds of errors that can be returned from the `Execute::spawn` function.
///
/// Executors which may not always be able to accept a future may return one of
/// these errors, indicating why it was unable to spawn a future.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ExecuteErrorKind {
/// This executor has shut down and will no longer accept new futures to
/// spawn.
Shutdown,

/// This executor has no more capacity to run more futures. Other futures
/// need to finish before this executor can accept another.
NoCapacity,

#[doc(hidden)]
__Nonexhaustive,
}

impl<F> ExecuteError<F> {
/// Create a new `ExecuteError`
pub fn new(kind: ExecuteErrorKind, future: F) -> ExecuteError<F> {
ExecuteError {
future: future,
kind: kind,
}
}

/// Returns the associated reason for the error
pub fn kind(&self) -> ExecuteErrorKind {
self.kind
}

/// Consumes self and returns the original future that was spawned.
pub fn into_future(self) -> F {
self.future
}
}

impl<F> fmt::Debug for ExecuteError<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.kind {
ExecuteErrorKind::Shutdown => "executor has shut down".fmt(f),
ExecuteErrorKind::NoCapacity => "executor has no more capacity".fmt(f),
ExecuteErrorKind::__Nonexhaustive => panic!(),
}
}
}
Loading

0 comments on commit b24dea1

Please sign in to comment.