Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a generic Executor trait #455

Merged
merged 1 commit into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions futures-cpupool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ use std::sync::mpsc;
use std::thread;

use futures::{IntoFuture, Future, Poll, Async};
use futures::future::lazy;
use futures::future::{lazy, Executor, ExecuteError};
use futures::sync::oneshot::{channel, Sender, Receiver};
use futures::executor::{self, Run, Executor};
use futures::executor::{self, Run, Executor as OldExecutor};

/// A thread pool intended to run CPU intensive work.
///
Expand Down Expand Up @@ -220,6 +220,15 @@ impl CpuPool {
}
}

impl<F> Executor<F> for CpuPool
where F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
executor::spawn(future).execute(self.inner.clone());
Ok(())
}
}

impl Inner {
fn send(&self, msg: Message) {
self.tx.lock().unwrap().send(msg).unwrap();
Expand Down Expand Up @@ -255,7 +264,7 @@ impl Drop for CpuPool {
}
}

impl Executor for Inner {
impl OldExecutor for Inner {
fn execute(&self, run: Run) {
self.send(Message::Run(run))
}
Expand Down
87 changes: 87 additions & 0 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! This module contains the `Future` trait and a number of adaptors for this
//! trait. See the crate docs, and the docs for `Future`, for full detail.

use core::fmt;
use core::result;

// Primitive futures
Expand Down Expand Up @@ -957,3 +958,89 @@ pub trait FutureFrom<T>: Sized {
/// Consume the given value, beginning the conversion.
fn future_from(T) -> Self::Future;
}

/// A trait for types which can spawn fresh futures.
///
/// This trait is typically implemented for "executors", or those types which
/// can execute futures to completion. Futures passed to `Spawn::spawn`
/// typically get turned into a *task* and are then driven to completion.
///
/// On spawn, the executor takes ownership of the future and becomes responsible
/// to call `Future::poll()` whenever a readiness notification is raised.
pub trait Executor<F: Future<Item = (), Error = ()>> {
/// Spawns a future to run on this `Executor`, typically in the
/// "background".
///
/// This function will return immediately, and schedule the future `future`
/// to run on `self`. The details of scheduling and execution are left to
/// the implementations of `Executor`, but this is typically a primary point
/// for injecting concurrency in a futures-based system. Futures spawned
/// through this `execute` function tend to run concurrently while they're
/// waiting on events.
///
/// # Errors
///
/// Implementors of this trait are allowed to reject accepting this future
/// as well. This can happen for various reason such as:
///
/// * The executor is shut down
/// * The executor has run out of capacity to execute futures
///
/// The decision is left to the caller how to work with this form of error.
/// The error returned transfers ownership of the future back to the caller.
fn execute(&self, future: F) -> Result<(), ExecuteError<F>>;
}

/// Errors returned from the `Spawn::spawn` function.
pub struct ExecuteError<F> {
future: F,
kind: ExecuteErrorKind,
}

/// Kinds of errors that can be returned from the `Execute::spawn` function.
///
/// Executors which may not always be able to accept a future may return one of
/// these errors, indicating why it was unable to spawn a future.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ExecuteErrorKind {
/// This executor has shut down and will no longer accept new futures to
/// spawn.
Shutdown,

/// This executor has no more capacity to run more futures. Other futures
/// need to finish before this executor can accept another.
NoCapacity,

#[doc(hidden)]
__Nonexhaustive,
}

impl<F> ExecuteError<F> {
/// Create a new `ExecuteError`
pub fn new(kind: ExecuteErrorKind, future: F) -> ExecuteError<F> {
ExecuteError {
future: future,
kind: kind,
}
}

/// Returns the associated reason for the error
pub fn kind(&self) -> ExecuteErrorKind {
self.kind
}

/// Consumes self and returns the original future that was spawned.
pub fn into_future(self) -> F {
self.future
}
}

impl<F> fmt::Debug for ExecuteError<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.kind {
ExecuteErrorKind::Shutdown => "executor has shut down".fmt(f),
ExecuteErrorKind::NoCapacity => "executor has no more capacity".fmt(f),
ExecuteErrorKind::__Nonexhaustive => panic!(),
}
}
}
Loading